Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 9e900cc5

History | View | Annotate | Download (68.1 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True,
81
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
82
        token = PendingThirdPartyUser.objects.get().token
83
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
84
        self.assertEqual(r.status_code, 200)
85

    
86
        # a new pending user created
87
        pending_user = PendingThirdPartyUser.objects.get(
88
            third_party_identifier="kpapeppn")
89
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
90
        # keep the token for future use
91
        token = pending_user.token
92
        # from now on no shibboleth headers are sent to the server
93
        client.reset_tokens()
94

    
95
        # this is the old way, it should fail, to avoid pending user take over
96
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
97
        self.assertEqual(r.status_code, 404)
98

    
99
        # this is the signup unique url associated with the pending user
100
        # created
101
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
102
        identifier = pending_user.third_party_identifier
103
        post_data = {'third_party_identifier': identifier,
104
                     'first_name': 'Kostas',
105
                     'third_party_token': token,
106
                     'last_name': 'Mitroglou',
107
                     'provider': 'shibboleth'}
108

    
109
        signup_url = reverse('signup')
110

    
111
        # invalid request
112
        invalid_data = copy.copy(post_data)
113
        invalid_data['provider'] = ''
114
        r = client.post(signup_url, invalid_data, follow=True)
115
        self.assertRedirects(r, reverse('signup'))
116

    
117
        # invlid email
118
        post_data['email'] = 'kpap'
119
        r = client.post(signup_url, post_data)
120
        self.assertContains(r, token)
121

    
122
        # existing email
123
        existing_user = get_local_user('test@test.com')
124
        post_data['email'] = 'test@test.com'
125
        r = client.post(signup_url, post_data)
126
        self.assertContains(r, messages.EMAIL_USED)
127
        existing_user.delete()
128

    
129
        # and finally a valid signup
130
        post_data['email'] = 'kpap-takeover@synnefo.org'
131
        r = client.post(signup_url, post_data, follow=True)
132
        self.assertContains(r, messages.VERIFICATION_SENT)
133

    
134
        # takeover of the uverified the shibboleth identifier
135
        client = ShibbolethClient()
136
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
137
                          eppn="kpapeppn",
138
                          cn="Kostas Papadimitriou",
139
                          ep_affiliation="Test Affiliation")
140
        r = client.get(ui_url('login/shibboleth?'), follow=True,
141
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
142
        # a new pending user created, previous one was deleted
143
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
144
        pending_user = PendingThirdPartyUser.objects.get(
145
            third_party_identifier="kpapeppn")
146
        identifier = pending_user.third_party_identifier
147
        token = pending_user.token
148
        post_data = {'third_party_identifier': identifier,
149
                     'third_party_token': token}
150
        post_data['email'] = 'kpap@synnefo.org'
151
        r = client.post(signup_url, post_data)
152
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
153
        # previously unverified user associated with kpapeppn gets deleted
154
        user_qs = AstakosUser.objects.filter(email="kpap-takeover@synnefo.org")
155
        self.assertEqual(user_qs.count(), 0)
156

    
157
        # entires commited as expected
158
        self.assertEqual(AstakosUser.objects.count(), 1)
159
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
160

    
161
        user = AstakosUser.objects.get()
162
        provider = user.get_auth_provider("shibboleth")
163
        headers = provider.provider_details['info']['headers']
164
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
165

    
166
        # provider info stored
167
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
168
        self.assertEqual(provider.affiliation, 'Test Affiliation')
169
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
170
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
171
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
172
        self.assertTrue('headers' in provider.info)
173

    
174
        # login (not verified yet)
175
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
176
                          cn="Kostas Papadimitriou")
177
        r = client.get(ui_url("login/shibboleth?"), follow=True)
178
        self.assertContains(r, 'A pending registration exists for')
179
        self.assertNotContains(r, 'pending moderation')
180
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
181
        tmp_third_party = PendingThirdPartyUser.objects.get()
182

    
183
        # user gets verified
184
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
185
        backend = activation_backends.get_backend()
186
        activation_result = backend.verify_user(u, u.verification_code)
187
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
188
                          cn="Kostas Papadimitriou")
189
        r = client.get(ui_url("login/shibboleth?"), follow=True)
190
        self.assertNotContains(r, 'A pending registration exists for')
191
        self.assertContains(r, 'pending moderation')
192

    
193
        # temporary signup process continues. meanwhile the user have verified
194
        # her account. The signup process should fail
195
        tp = tmp_third_party
196
        post_data = {'third_party_identifier': tp.third_party_identifier,
197
                     'email': 'unsed-email@synnefo.org',
198
                     'third_party_token': tp.token}
199
        r = client.post(signup_url, post_data)
200
        self.assertEqual(r.status_code, 404)
201

    
202
        r = client.post(reverse('astakos.im.views.target.local.password_reset'),
203
                        {'email': 'kpap@synnefo.org'})
204
        self.assertContains(r, 'Classic login is not enabled for your account')
205

    
206
        # admin activates the user
207
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
208
        backend = activation_backends.get_backend()
209
        activation_result = backend.verify_user(u, u.verification_code)
210
        activation_result = backend.accept_user(u)
211
        self.assertFalse(activation_result.is_error())
212
        backend.send_result_notifications(activation_result, u)
213
        self.assertEqual(u.is_active, True)
214

    
215

    
216
        # we see our profile
217
        r = client.get(ui_url("login/shibboleth?"), follow=True)
218
        self.assertRedirects(r, ui_url('landing'))
219
        self.assertEqual(r.status_code, 200)
220

    
221
    def test_existing(self):
222
        """
223
        Test adding of third party login to an existing account
224
        """
225

    
226
        # this is our existing user
227
        existing_user = get_local_user('kpap@synnefo.org')
228
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
229
        existing_inactive.is_active = False
230
        existing_inactive.save()
231

    
232
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
233
        existing_unverified.is_active = False
234
        existing_unverified.activation_sent = None
235
        existing_unverified.email_verified = False
236
        existing_unverified.is_verified = False
237
        existing_unverified.save()
238

    
239
        client = ShibbolethClient()
240
        # shibboleth logged us in, notice that we use different email
241
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
242
                          cn="Kostas Papadimitriou", )
243
        r = client.get(ui_url("login/shibboleth?"), follow=True)
244

    
245
        # a new pending user created
246
        pending_user = PendingThirdPartyUser.objects.get()
247
        token = pending_user.token
248
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
249
        pending_key = pending_user.token
250
        client.reset_tokens()
251
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
252

    
253
        form = r.context['login_form']
254
        signupdata = copy.copy(form.initial)
255
        signupdata['email'] = 'kpap@synnefo.org'
256
        signupdata['third_party_token'] = token
257
        signupdata['provider'] = 'shibboleth'
258
        signupdata.pop('id', None)
259

    
260
        # the email exists to another user
261
        r = client.post(ui_url("signup"), signupdata)
262
        self.assertContains(r, "There is already an account with this email "
263
                               "address")
264
        # change the case, still cannot create
265
        signupdata['email'] = 'KPAP@synnefo.org'
266
        r = client.post(ui_url("signup"), signupdata)
267
        self.assertContains(r, "There is already an account with this email "
268
                               "address")
269
        # inactive user
270
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
271
        r = client.post(ui_url("signup"), signupdata)
272
        self.assertContains(r, "There is already an account with this email "
273
                               "address")
274

    
275
        # unverified user, this should pass, old entry will be deleted
276
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
277
        r = client.post(ui_url("signup"), signupdata)
278

    
279
        post_data = {'password': 'password',
280
                     'username': 'kpap@synnefo.org'}
281
        r = client.post(ui_url('local'), post_data, follow=True)
282
        self.assertTrue(r.context['request'].user.is_authenticated())
283
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
284
                          cn="Kostas Papadimitriou", )
285
        r = client.get(ui_url("login/shibboleth?"), follow=True)
286
        self.assertContains(r, "enabled for this account")
287
        client.reset_tokens()
288

    
289
        user = existing_user
290
        self.assertTrue(user.has_auth_provider('shibboleth'))
291
        self.assertTrue(user.has_auth_provider('local',
292
                                               auth_backend='astakos'))
293
        client.logout()
294

    
295
        # look Ma, i can login with both my shibboleth and local account
296
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
297
                          cn="Kostas Papadimitriou")
298
        r = client.get(ui_url("login/shibboleth?"), follow=True)
299
        self.assertTrue(r.context['request'].user.is_authenticated())
300
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
301
        self.assertRedirects(r, ui_url('landing'))
302
        self.assertEqual(r.status_code, 200)
303

    
304
        user = r.context['request'].user
305
        client.logout()
306
        client.reset_tokens()
307

    
308
        # logged out
309
        r = client.get(ui_url("profile"), follow=True)
310
        self.assertFalse(r.context['request'].user.is_authenticated())
311

    
312
        # login with local account also works
313
        post_data = {'password': 'password',
314
                     'username': 'kpap@synnefo.org'}
315
        r = self.client.post(ui_url('local'), post_data, follow=True)
316
        self.assertTrue(r.context['request'].user.is_authenticated())
317
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
318
        self.assertRedirects(r, ui_url('landing'))
319
        self.assertEqual(r.status_code, 200)
320

    
321
        # cannot add the same eppn
322
        client.set_tokens(mail="secondary@shibboleth.gr",
323
                          remote_user="kpapeppn",
324
                          cn="Kostas Papadimitriou", )
325
        r = client.get(ui_url("login/shibboleth?"), follow=True)
326
        self.assertRedirects(r, ui_url('landing'))
327
        self.assertTrue(r.status_code, 200)
328
        self.assertEquals(existing_user.auth_providers.count(), 2)
329

    
330
        # only one allowed by default
331
        client.set_tokens(mail="secondary@shibboleth.gr",
332
                          remote_user="kpapeppn2",
333
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
334
        prov = auth_providers.get_provider('shibboleth')
335
        r = client.get(ui_url("login/shibboleth?"), follow=True)
336
        self.assertContains(r, "Failed to add")
337
        self.assertRedirects(r, ui_url('profile'))
338
        self.assertTrue(r.status_code, 200)
339
        self.assertEquals(existing_user.auth_providers.count(), 2)
340
        client.logout()
341
        client.reset_tokens()
342

    
343
        # cannot login with another eppn
344
        client.set_tokens(mail="kpap@synnefo.org",
345
                          remote_user="kpapeppninvalid",
346
                          cn="Kostas Papadimitriou")
347
        r = client.get(ui_url("login/shibboleth?"), follow=True)
348
        self.assertFalse(r.context['request'].user.is_authenticated())
349

    
350
        # cannot
351

    
352
        # lets remove local password
353
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
354
                                       email="kpap@synnefo.org")
355
        remove_local_url = user.get_auth_provider('local').get_remove_url
356
        remove_shibbo_url = user.get_auth_provider('shibboleth',
357
                                                   'kpapeppn').get_remove_url
358
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
359
                          cn="Kostas Papadimtriou")
360
        r = client.get(ui_url("login/shibboleth?"), follow=True)
361
        client.reset_tokens()
362

    
363
        # only POST is allowed (for CSRF protection)
364
        r = client.get(remove_local_url, follow=True)
365
        self.assertEqual(r.status_code, 405)
366

    
367
        r = client.post(remove_local_url, follow=True)
368
        # 2 providers left
369
        self.assertEqual(user.auth_providers.count(), 1)
370
        # cannot remove last provider
371
        r = client.post(remove_shibbo_url)
372
        self.assertEqual(r.status_code, 403)
373
        self.client.logout()
374

    
375
        # cannot login using local credentials (notice we use another client)
376
        post_data = {'password': 'password',
377
                     'username': 'kpap@synnefo.org'}
378
        r = self.client.post(ui_url('local'), post_data, follow=True)
379
        self.assertFalse(r.context['request'].user.is_authenticated())
380

    
381
        # we can reenable the local provider by setting a password
382
        r = client.get(ui_url("password_change"), follow=True)
383
        r = client.post(ui_url("password_change"), {'new_password1': '111',
384
                                                    'new_password2': '111'},
385
                        follow=True)
386
        user = r.context['request'].user
387
        self.assertTrue(user.has_auth_provider('local'))
388
        self.assertTrue(user.has_auth_provider('shibboleth'))
389
        self.assertTrue(user.check_password('111'))
390
        self.assertTrue(user.has_usable_password())
391

    
392
        # change password via profile form
393
        r = client.post(ui_url("profile"), {
394
            'old_password': '111',
395
            'new_password': '',
396
            'new_password2': '',
397
            'change_password': 'on',
398
        }, follow=False)
399
        self.assertEqual(r.status_code, 200)
400
        self.assertFalse(r.context['profile_form'].is_valid())
401

    
402
        self.client.logout()
403

    
404
        # now we can login
405
        post_data = {'password': '111',
406
                     'username': 'kpap@synnefo.org'}
407
        r = self.client.post(ui_url('local'), post_data, follow=True)
408
        self.assertTrue(r.context['request'].user.is_authenticated())
409

    
410
        client.reset_tokens()
411

    
412
        # we cannot take over another shibboleth identifier
413
        user2 = get_local_user('another@synnefo.org')
414
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
415
        # login
416
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
417
                          cn="Kostas Papadimitriou")
418
        r = client.get(ui_url("login/shibboleth?"), follow=True)
419
        # try to assign existing shibboleth identifier of another user
420
        client.set_tokens(mail="kpap_second@shibboleth.gr",
421
                          remote_user="existingeppn",
422
                          cn="Kostas Papadimitriou")
423
        r = client.get(ui_url("login/shibboleth?"), follow=True)
424
        self.assertContains(r, "is already in use")
425

    
426

    
427
class TestLocal(TestCase):
428

    
429
    def setUp(self):
430
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
431
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
432
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
433
        settings.ASTAKOS_MODERATION_ENABLED = True
434

    
435
    def tearDown(self):
436
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
437
        AstakosUser.objects.all().delete()
438

    
439
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
440
    def test_login_ratelimit(self):
441
        from django.core.cache import cache
442
        cache.clear()
443
        [cache.delete(key) for key in cache._cache.keys()]
444

    
445
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
446
        r = self.client.post(ui_url('local'), credentials, follow=True)
447
        fields = r.context['login_form'].fields.keyOrder
448
        self.assertFalse('recaptcha_challenge_field' in fields)
449
        r = self.client.post(ui_url('local'), credentials, follow=True)
450
        fields = r.context['login_form'].fields.keyOrder
451
        self.assertFalse('recaptcha_challenge_field' in fields)
452
        r = self.client.post(ui_url('local'), credentials, follow=True)
453
        fields = r.context['login_form'].fields.keyOrder
454
        self.assertTrue('recaptcha_challenge_field' in fields)
455

    
456
    def test_no_moderation(self):
457
        # disable moderation
458
        astakos_settings.MODERATION_ENABLED = False
459

    
460
        # create a new user
461
        r = self.client.get(ui_url("signup"))
462
        self.assertEqual(r.status_code, 200)
463
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
464
                'password2': 'password', 'first_name': 'Kostas',
465
                'last_name': 'Mitroglou', 'provider': 'local'}
466
        r = self.client.post(ui_url("signup"), data)
467

    
468
        # user created
469
        self.assertEqual(AstakosUser.objects.count(), 1)
470
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
471
                                       email="kpap@synnefo.org")
472
        self.assertEqual(user.username, 'kpap@synnefo.org')
473
        self.assertEqual(user.has_auth_provider('local'), True)
474
        self.assertFalse(user.is_active)
475

    
476
        # user (but not admin) gets notified
477
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
478
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
479
        astakos_settings.MODERATION_ENABLED = True
480

    
481
    def test_email_case(self):
482
        data = {
483
            'email': 'kPap@synnefo.org',
484
            'password1': '1234',
485
            'password2': '1234'
486
        }
487

    
488
        form = forms.LocalUserCreationForm(data)
489
        self.assertTrue(form.is_valid())
490
        user = form.create_user()
491

    
492
        u = AstakosUser.objects.get()
493
        self.assertEqual(u.email, 'kPap@synnefo.org')
494
        self.assertEqual(u.username, 'kpap@synnefo.org')
495
        u.is_active = True
496
        u.email_verified = True
497
        u.save()
498

    
499
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
500
        login = forms.LoginForm(data=data)
501
        self.assertTrue(login.is_valid())
502

    
503
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
504
        login = forms.LoginForm(data=data)
505
        self.assertTrue(login.is_valid())
506

    
507
        data = {
508
            'email': 'kpap@synnefo.org',
509
            'password1': '1234',
510
            'password2': '1234'
511
        }
512
        form = forms.LocalUserCreationForm(data)
513
        self.assertFalse(form.is_valid())
514

    
515
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
516
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
517
    def test_local_provider(self):
518
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
519

    
520
        # create a user
521
        r = self.client.get(ui_url("signup"))
522
        self.assertEqual(r.status_code, 200)
523
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
524
                'password2': 'password', 'first_name': 'Kostas',
525
                'last_name': 'Mitroglou', 'provider': 'local'}
526
        r = self.client.post(ui_url("signup"), data)
527

    
528
        # user created
529
        self.assertEqual(AstakosUser.objects.count(), 1)
530
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
531
                                       email="kpap@synnefo.org")
532
        self.assertEqual(user.username, 'kpap@synnefo.org')
533
        self.assertEqual(user.has_auth_provider('local'), True)
534
        self.assertFalse(user.is_active)  # not activated
535
        self.assertFalse(user.email_verified)  # not verified
536
        self.assertTrue(user.activation_sent)  # activation automatically sent
537
        self.assertFalse(user.moderated)
538
        self.assertFalse(user.email_verified)
539

    
540
        # admin gets notified and activates the user from the command line
541
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
542
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
543
                                               'password': 'password'},
544
                             follow=True)
545
        self.assertContains(r, messages.VERIFICATION_SENT)
546
        backend = activation_backends.get_backend()
547

    
548
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
549
        backend.send_user_verification_email(user)
550

    
551
        # user activation fields updated and user gets notified via email
552
        user = AstakosUser.objects.get(pk=user.pk)
553
        self.assertTrue(user.activation_sent)
554
        self.assertFalse(user.email_verified)
555
        self.assertFalse(user.is_active)
556
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
557

    
558
        # user forgot she got registered and tries to submit registration
559
        # form. Notice the upper case in email
560
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
561
                'password2': 'password', 'first_name': 'Kostas',
562
                'last_name': 'Mitroglou', 'provider': 'local'}
563
        r = self.client.post(ui_url("signup"), data, follow=True)
564
        self.assertRedirects(r, reverse('login'))
565
        self.assertContains(r, messages.VERIFICATION_SENT)
566

    
567
        user = AstakosUser.objects.get()
568
        # previous user replaced
569
        self.assertTrue(user.activation_sent)
570
        self.assertFalse(user.email_verified)
571
        self.assertFalse(user.is_active)
572
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
573

    
574
        # hmmm, email exists; lets request a password change
575
        r = self.client.get(ui_url('local/password_reset'))
576
        self.assertEqual(r.status_code, 200)
577
        data = {'email': 'kpap@synnefo.org'}
578
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
579
        # she can't because account is not active yet
580
        self.assertContains(r, 'pending email verification')
581

    
582
        # moderation is enabled and an activation email has already been sent
583
        # so user can trigger resend of the activation email
584
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
585
                            follow=True)
586
        self.assertContains(r, 'has been sent to your email address.')
587
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
588

    
589
        # also she cannot login
590
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
591
        r = self.client.post(ui_url('local'), data, follow=True)
592
        self.assertContains(r, 'Resend verification')
593
        self.assertFalse(r.context['request'].user.is_authenticated())
594
        self.assertFalse('_pithos2_a' in self.client.cookies)
595

    
596
        # user sees the message and resends activation
597
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
598
                            follow=True)
599
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
600

    
601
        # logged in user cannot activate another account
602
        tmp_user = get_local_user("test_existing_user@synnefo.org")
603
        tmp_client = Client()
604
        tmp_client.login(username="test_existing_user@synnefo.org",
605
                         password="password")
606
        r = tmp_client.get(user.get_activation_url(), follow=True)
607
        self.assertContains(r, messages.LOGGED_IN_WARNING)
608

    
609
        # empty activation code is not allowed
610
        r = self.client.get(user.get_activation_url().split("?")[0],
611
                            follow=True)
612
        self.assertEqual(r.status_code, 403)
613

    
614
        r = self.client.get(user.get_activation_url(), follow=True)
615
        # previous code got invalidated
616
        self.assertRedirects(r, reverse('login'))
617
        self.assertContains(r, astakos_messages.INVALID_ACTIVATION_KEY)
618
        self.assertEqual(r.status_code, 200)
619

    
620
        user = AstakosUser.objects.get(pk=user.pk)
621
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
622
        r = self.client.get(user.get_activation_url(), follow=True)
623
        self.assertRedirects(r, reverse('login'))
624
        # user sees that account is pending approval from admins
625
        self.assertContains(r, messages.NOTIFICATION_SENT)
626
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
627

    
628
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
629
        result = backend.handle_moderation(user)
630
        backend.send_result_notifications(result, user)
631
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
632
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
633

    
634
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
635
        r = self.client.get(ui_url('profile'), follow=True)
636
        self.assertFalse(r.context['request'].user.is_authenticated())
637
        self.assertFalse('_pithos2_a' in self.client.cookies)
638
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
639

    
640
        user = AstakosUser.objects.get(pk=user.pk)
641
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
642
                                               'password': 'password'},
643
                             follow=True)
644
        # user activated and logged in, token cookie set
645
        self.assertTrue(r.context['request'].user.is_authenticated())
646
        self.assertTrue('_pithos2_a' in self.client.cookies)
647
        cookies = self.client.cookies
648
        self.assertTrue(quote(user.auth_token) in
649
                        cookies.get('_pithos2_a').value)
650
        r = self.client.get(ui_url('logout'), follow=True)
651
        r = self.client.get(ui_url(''), follow=True)
652
        self.assertRedirects(r, ui_url('login'))
653
        # user logged out, token cookie removed
654
        self.assertFalse(r.context['request'].user.is_authenticated())
655
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
656

    
657
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
658
        del self.client.cookies['_pithos2_a']
659

    
660
        # user can login
661
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
662
                                               'password': 'password'},
663
                             follow=True)
664
        self.assertTrue(r.context['request'].user.is_authenticated())
665
        self.assertTrue('_pithos2_a' in self.client.cookies)
666
        cookies = self.client.cookies
667
        self.assertTrue(quote(user.auth_token) in
668
                        cookies.get('_pithos2_a').value)
669
        self.client.get(ui_url('logout'), follow=True)
670

    
671
        # user forgot password
672
        old_pass = user.password
673
        r = self.client.get(ui_url('local/password_reset'))
674
        self.assertEqual(r.status_code, 200)
675
        r = self.client.post(ui_url('local/password_reset'),
676
                             {'email': 'kpap@synnefo.org'})
677
        self.assertEqual(r.status_code, 302)
678
        # email sent
679
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
680

    
681
        # user visits change password link
682
        user = AstakosUser.objects.get(pk=user.pk)
683
        r = self.client.get(user.get_password_reset_url())
684
        r = self.client.post(user.get_password_reset_url(),
685
                             {'new_password1': 'newpass',
686
                              'new_password2': 'newpass'})
687

    
688
        user = AstakosUser.objects.get(pk=user.pk)
689
        self.assertNotEqual(old_pass, user.password)
690

    
691
        # old pass is not usable
692
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
693
                                               'password': 'password'})
694
        self.assertContains(r, 'Please enter a correct username and password')
695
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
696
                                               'password': 'newpass'},
697
                             follow=True)
698
        self.assertTrue(r.context['request'].user.is_authenticated())
699
        self.client.logout()
700

    
701
        # tests of special local backends
702
        user = AstakosUser.objects.get(pk=user.pk)
703
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
704
        user.save()
705

    
706
        # non astakos local backends do not support password reset
707
        r = self.client.get(ui_url('local/password_reset'))
708
        self.assertEqual(r.status_code, 200)
709
        r = self.client.post(ui_url('local/password_reset'),
710
                             {'email': 'kpap@synnefo.org'})
711
        # she can't because account is not active yet
712
        self.assertContains(r, "Changing password is not")
713

    
714
    def test_fix_superuser(self):
715
        u = User.objects.create(username="dummy", email="email@example.org",
716
                                first_name="Super", last_name="User",
717
                                is_superuser=True)
718
        User.objects.create(username="dummy2", email="email2@example.org",
719
                            first_name="Other", last_name="User")
720
        fixed = auth_functions.fix_superusers()
721
        self.assertEqual(len(fixed), 1)
722
        fuser = fixed[0]
723
        self.assertEqual(fuser.email, fuser.username)
724

    
725

    
726
class UserActionsTests(TestCase):
727

    
728
    def test_email_validation(self):
729
        backend = activation_backends.get_backend()
730
        form = backend.get_signup_form('local')
731
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
732
        user_data = {
733
            'email': 'kpap@synnefo.org',
734
            'first_name': 'Kostas Papas',
735
            'password1': '123',
736
            'password2': '123'
737
        }
738
        form = backend.get_signup_form(provider='local',
739
                                       initial_data=user_data)
740
        self.assertTrue(form.is_valid())
741
        user_data['email'] = 'kpap@synnefo.org.'
742
        form = backend.get_signup_form(provider='local',
743
                                       initial_data=user_data)
744
        self.assertFalse(form.is_valid())
745

    
746
    def test_email_change(self):
747
        # to test existing email validation
748
        get_local_user('existing@synnefo.org')
749

    
750
        # local user
751
        user = get_local_user('kpap@synnefo.org')
752

    
753
        # login as kpap
754
        self.client.login(username='kpap@synnefo.org', password='password')
755
        r = self.client.get(ui_url('profile'), follow=True)
756
        user = r.context['request'].user
757
        self.assertTrue(user.is_authenticated())
758

    
759
        # change email is enabled
760
        r = self.client.get(ui_url('email_change'))
761
        self.assertEqual(r.status_code, 200)
762
        self.assertFalse(user.email_change_is_pending())
763

    
764
        # invalid email format
765
        data = {'new_email_address': 'existing@synnefo.org.'}
766
        r = self.client.post(ui_url('email_change'), data)
767
        form = r.context['form']
768
        self.assertFalse(form.is_valid())
769

    
770
        # request email change to an existing email fails
771
        data = {'new_email_address': 'existing@synnefo.org'}
772
        r = self.client.post(ui_url('email_change'), data)
773

    
774
        self.assertContains(r, messages.EMAIL_USED)
775

    
776
        # proper email change
777
        data = {'new_email_address': 'kpap@gmail.com'}
778
        r = self.client.post(ui_url('email_change'), data, follow=True)
779
        self.assertRedirects(r, ui_url('profile'))
780
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
781
        change1 = EmailChange.objects.get()
782

    
783
        # user sees a warning
784
        r = self.client.get(ui_url('email_change'))
785
        self.assertEqual(r.status_code, 200)
786
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
787
        self.assertTrue(user.email_change_is_pending())
788

    
789
        # link was sent
790
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
791
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
792

    
793
        # proper email change
794
        data = {'new_email_address': 'kpap@yahoo.com'}
795
        r = self.client.post(ui_url('email_change'), data, follow=True)
796
        self.assertRedirects(r, ui_url('profile'))
797
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
798
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
799
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
800
        change2 = EmailChange.objects.get()
801

    
802
        r = self.client.get(change1.get_url())
803
        self.assertEquals(r.status_code, 404)
804
        self.client.logout()
805

    
806
        invalid_client = Client()
807
        r = invalid_client.post(ui_url('local?'),
808
                                {'username': 'existing@synnefo.org',
809
                                 'password': 'password'})
810
        r = invalid_client.get(change2.get_url(), follow=True)
811
        self.assertEquals(r.status_code, 403)
812

    
813
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
814
                             {'username': 'kpap@synnefo.org',
815
                              'password': 'password',
816
                              'next': change2.get_url()},
817
                             follow=True)
818
        self.assertRedirects(r, ui_url('profile'))
819
        user = r.context['request'].user
820
        self.assertEquals(user.email, 'kpap@yahoo.com')
821
        self.assertEquals(user.username, 'kpap@yahoo.com')
822

    
823
        self.client.logout()
824
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
825
                             {'username': 'kpap@synnefo.org',
826
                              'password': 'password',
827
                              'next': change2.get_url()},
828
                             follow=True)
829
        self.assertContains(r, "Please enter a correct username and password")
830
        self.assertEqual(user.emailchanges.count(), 0)
831

    
832
        AstakosUser.objects.all().delete()
833
        Group.objects.all().delete()
834

    
835

    
836
TEST_TARGETED_ID1 = \
837
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
838
TEST_TARGETED_ID2 = \
839
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
840
TEST_TARGETED_ID3 = \
841
    "https://idp.synnefo.org/idp/shibboleth!"
842

    
843

    
844
class TestAuthProviderViews(TestCase):
845

    
846
    def tearDown(self):
847
        AstakosUser.objects.all().delete()
848

    
849
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
850
                 SHIBBOLETH_MIGRATE_EPPN=True)
851
    def migrate_to_remote_id(self):
852
        eppn_user = get_local_user("eppn@synnefo.org")
853
        tid_user = get_local_user("tid@synnefo.org")
854
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
855
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
856

    
857
        get_user = lambda r: r.context['request'].user
858

    
859
        client = ShibbolethClient()
860
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
861
        r = client.get(ui_url('login/shibboleth?'), follow=True)
862
        self.assertTrue(get_user(r).is_authenticated())
863
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
864
                         TEST_TARGETED_ID2)
865

    
866
        client = ShibbolethClient()
867
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
868
        r = client.get(ui_url('login/shibboleth?'), follow=True)
869
        self.assertTrue(get_user(r).is_authenticated())
870
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
871
                         TEST_TARGETED_ID1)
872

    
873
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
874
                         AUTOMODERATE_POLICY=True)
875
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
876
                 HELPDESK=(('support', 'support@synnefo.org'),),
877
                 FORCE_PROFILE_UPDATE=False)
878
    def test_user(self):
879
        Profile = AuthProviderPolicyProfile
880
        Pending = PendingThirdPartyUser
881
        User = AstakosUser
882

    
883
        auth_functions.make_user("newuser@synnefo.org")
884
        get_local_user("olduser@synnefo.org")
885
        cl_olduser = ShibbolethClient()
886
        get_local_user("olduser2@synnefo.org")
887
        ShibbolethClient()
888
        cl_newuser = ShibbolethClient()
889
        cl_newuser2 = Client()
890

    
891
        academic_group, created = Group.objects.get_or_create(
892
            name='academic-login')
893
        academic_users = academic_group.user_set
894
        assert created
895
        policy_only_academic = Profile.objects.add_policy('academic_strict',
896
                                                          'shibboleth',
897
                                                          academic_group,
898
                                                          exclusive=True,
899
                                                          login=False,
900
                                                          add=False)
901

    
902
        # new academic user
903
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
904
        cl_newuser.set_tokens(remote_user="newusereppn",
905
                              mail="newuser@synnefo.org", surname="Lastname")
906
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
907
        initial = r.context['signup_form'].initial
908
        pending = Pending.objects.get()
909
        self.assertEqual(initial.get('last_name'), 'Lastname')
910
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
911
        identifier = pending.third_party_identifier
912
        signup_data = {'third_party_identifier': identifier,
913
                       'first_name': 'Academic',
914
                       'third_party_token': pending.token,
915
                       'last_name': 'New User',
916
                       'provider': 'shibboleth'}
917
        r = cl_newuser.post(ui_url('signup'), signup_data)
918
        self.assertContains(r, "This field is required", )
919
        signup_data['email'] = 'olduser@synnefo.org'
920
        r = cl_newuser.post(ui_url('signup'), signup_data)
921
        self.assertContains(r, "already an account with this email", )
922
        signup_data['email'] = 'newuser@synnefo.org'
923
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
924
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
925
        self.assertEqual(r.status_code, 404)
926
        newuser = User.objects.get(email="newuser@synnefo.org")
927
        activation_link = newuser.get_activation_url()
928
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
929

    
930
        # new non-academic user
931
        signup_data = {'first_name': 'Non Academic',
932
                       'last_name': 'New User',
933
                       'provider': 'local',
934
                       'password1': 'password',
935
                       'password2': 'password'}
936
        signup_data['email'] = 'olduser@synnefo.org'
937
        r = cl_newuser2.post(ui_url('signup'), signup_data)
938
        self.assertContains(r, 'There is already an account with this '
939
                               'email address')
940
        signup_data['email'] = 'newuser@synnefo.org'
941
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
942
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
943
        r = self.client.get(activation_link, follow=True)
944
        self.assertEqual(r.status_code, 200)
945
        self.assertContains(r, astakos_messages.INVALID_ACTIVATION_KEY)
946
        newuser = User.objects.get(email="newuser@synnefo.org")
947
        self.assertTrue(newuser.activation_sent)
948

    
949
        # activation sent, user didn't open verification url so additional
950
        # registrations invalidate the previous signups.
951
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
952
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
953
        pending = Pending.objects.get()
954
        identifier = pending.third_party_identifier
955
        signup_data = {'third_party_identifier': identifier,
956
                       u'first_name': 'Academic γιούνικοουντ',
957
                       'third_party_token': pending.token,
958
                       'last_name': 'New User',
959
                       'provider': 'shibboleth'}
960
        signup_data['email'] = 'newuser@synnefo.org'
961
        r = cl_newuser.post(ui_url('signup'), signup_data)
962
        self.assertEqual(r.status_code, 302)
963
        newuser = User.objects.get(email="newuser@synnefo.org")
964
        self.assertTrue(newuser.activation_sent)
965
        activation_link = newuser.get_activation_url()
966
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
967
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
968
        self.assertRedirects(r, ui_url('landing'))
969
        helpdesk_email = astakos_settings.HELPDESK[0][1]
970
        self.assertEqual(len(get_mailbox(helpdesk_email)), 1)
971
        self.assertTrue(u'AstakosUser: Academic γιούνικοουντ' in \
972
                            get_mailbox(helpdesk_email)[0].body)
973
        newuser = User.objects.get(email="newuser@synnefo.org")
974
        self.assertEqual(newuser.is_active, True)
975
        self.assertEqual(newuser.email_verified, True)
976
        cl_newuser.logout()
977

    
978
        # cannot reactivate if suspended
979
        newuser.is_active = False
980
        newuser.save()
981
        r = cl_newuser.get(newuser.get_activation_url())
982
        newuser = User.objects.get(email="newuser@synnefo.org")
983
        self.assertFalse(newuser.is_active)
984

    
985
        # release suspension
986
        newuser.is_active = True
987
        newuser.save()
988

    
989
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
990
        local = auth.get_provider('local', newuser)
991
        self.assertEqual(local.get_add_policy, False)
992
        self.assertEqual(local.get_login_policy, False)
993
        r = cl_newuser.get(local.get_add_url, follow=True)
994
        self.assertRedirects(r, ui_url('profile'))
995
        self.assertContains(r, 'disabled for your')
996

    
997
        cl_olduser.login(username='olduser@synnefo.org', password="password")
998
        r = cl_olduser.get(ui_url('profile'), follow=True)
999
        self.assertEqual(r.status_code, 200)
1000
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
1001
        self.assertContains(r, 'Your request is missing a unique token')
1002
        cl_olduser.set_tokens(remote_user="newusereppn")
1003
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
1004
        self.assertContains(r, 'already in use')
1005
        cl_olduser.set_tokens(remote_user="oldusereppn")
1006
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
1007
        self.assertContains(r, 'Academic login enabled for this account')
1008

    
1009
        user = User.objects.get(email="olduser@synnefo.org")
1010
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1011
        local_provider = user.get_auth_provider('local')
1012
        self.assertEqual(shib_provider.get_remove_policy, True)
1013
        self.assertEqual(local_provider.get_remove_policy, True)
1014

    
1015
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
1016
                                                          'shibboleth',
1017
                                                          academic_group,
1018
                                                          remove=False)
1019
        user.groups.add(academic_group)
1020
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1021
        local_provider = user.get_auth_provider('local')
1022
        self.assertEqual(shib_provider.get_remove_policy, False)
1023
        self.assertEqual(local_provider.get_remove_policy, True)
1024
        self.assertEqual(local_provider.get_login_policy, False)
1025

    
1026
        cl_olduser.logout()
1027
        login_data = {'username': 'olduser@synnefo.org',
1028
                      'password': 'password'}
1029
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
1030
        self.assertContains(r, "login/shibboleth'>Academic login")
1031
        Group.objects.all().delete()
1032

    
1033

    
1034
class TestAuthProvidersAPI(TestCase):
1035
    """
1036
    Test auth_providers module API
1037
    """
1038

    
1039
    def tearDown(self):
1040
        Group.objects.all().delete()
1041

    
1042
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1043
    def test_create(self):
1044
        user = auth_functions.make_user(email="kpap@synnefo.org")
1045
        user2 = auth_functions.make_user(email="kpap2@synnefo.org")
1046

    
1047
        module = 'shibboleth'
1048
        identifier = 'SHIB_UUID'
1049
        provider_params = {
1050
            'affiliation': 'UNIVERSITY',
1051
            'info': {'age': 27}
1052
        }
1053
        provider = auth.get_provider(module, user2, identifier,
1054
                                     **provider_params)
1055
        provider.add_to_user()
1056
        provider = auth.get_provider(module, user, identifier,
1057
                                     **provider_params)
1058
        provider.add_to_user()
1059
        user.email_verified = True
1060
        user.save()
1061
        self.assertRaises(Exception, provider.add_to_user)
1062
        provider = user.get_auth_provider(module, identifier)
1063
        self.assertEqual(user.get_auth_provider(
1064
            module, identifier)._instance.info.get('age'), 27)
1065

    
1066
        module = 'local'
1067
        identifier = None
1068
        provider_params = {'auth_backend': 'ldap', 'info':
1069
                          {'office': 'A1'}}
1070
        provider = auth.get_provider(module, user, identifier,
1071
                                     **provider_params)
1072
        provider.add_to_user()
1073
        self.assertFalse(provider.get_add_policy)
1074
        self.assertRaises(Exception, provider.add_to_user)
1075

    
1076
        shib = user.get_auth_provider('shibboleth',
1077
                                      'SHIB_UUID')
1078
        self.assertTrue(shib.get_remove_policy)
1079

    
1080
        local = user.get_auth_provider('local')
1081
        self.assertTrue(local.get_remove_policy)
1082

    
1083
        local.remove_from_user()
1084
        self.assertFalse(shib.get_remove_policy)
1085
        self.assertRaises(Exception, shib.remove_from_user)
1086

    
1087
        provider = user.get_auth_providers()[0]
1088
        self.assertRaises(Exception, provider.add_to_user)
1089

    
1090
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1091
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1092
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1093
                                                 'group2'])
1094
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1095
                        CREATION_GROUPS_POLICY=['localgroup-create',
1096
                                                'group-create'])
1097
    def test_add_groups(self):
1098
        user = auth_functions.make_user("kpap@synnefo.org")
1099
        provider = auth.get_provider('shibboleth', user, 'test123')
1100
        provider.add_to_user()
1101
        user = AstakosUser.objects.get()
1102
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1103
                         sorted([u'group1', u'group2', u'group-create']))
1104

    
1105
        local = auth.get_provider('local', user)
1106
        local.add_to_user()
1107
        provider = user.get_auth_provider('shibboleth')
1108
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1109
        provider.remove_from_user()
1110
        user = AstakosUser.objects.get()
1111
        self.assertEqual(len(user.get_auth_providers()), 1)
1112
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1113
                         sorted([u'group-create', u'localgroup']))
1114

    
1115
        local = user.get_auth_provider('local')
1116
        self.assertRaises(Exception, local.remove_from_user)
1117
        provider = auth.get_provider('shibboleth', user, 'test123')
1118
        provider.add_to_user()
1119
        user = AstakosUser.objects.get()
1120
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1121
                         sorted([u'group-create', u'group1', u'group2',
1122
                                 u'localgroup']))
1123
        Group.objects.all().delete()
1124

    
1125
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1126
    def test_policies(self):
1127
        group_old, created = Group.objects.get_or_create(name='olduser')
1128

    
1129
        astakos_settings.MODERATION_ENABLED = True
1130
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1131
            ['academic-user']
1132
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1133
            ['google-user']
1134

    
1135
        user = auth_functions.make_user("kpap@synnefo.org")
1136
        user.groups.add(group_old)
1137
        user.add_auth_provider('local')
1138

    
1139
        user2 = auth_functions.make_user("kpap2@synnefo.org")
1140
        user2.add_auth_provider('shibboleth', identifier='shibid')
1141

    
1142
        user3 = auth_functions.make_user("kpap3@synnefo.org")
1143
        user3.groups.add(group_old)
1144
        user3.add_auth_provider('local')
1145
        user3.add_auth_provider('shibboleth', identifier='1234')
1146

    
1147
        self.assertTrue(user2.groups.get(name='academic-user'))
1148
        self.assertFalse(user2.groups.filter(name='olduser').count())
1149

    
1150
        local = auth_providers.get_provider('local')
1151
        self.assertTrue(local.get_add_policy)
1152

    
1153
        academic_group = Group.objects.get(name='academic-user')
1154
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1155
                                                     academic_group,
1156
                                                     exclusive=True,
1157
                                                     add=False,
1158
                                                     login=False)
1159
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1160
                                                     academic_group,
1161
                                                     exclusive=True,
1162
                                                     login=False,
1163
                                                     add=False)
1164
        # no duplicate entry gets created
1165
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1166

    
1167
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1168
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1169
                                                     user2,
1170
                                                     remove=False)
1171
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1172

    
1173
        local = auth_providers.get_provider('local', user2)
1174
        google = auth_providers.get_provider('google', user2)
1175
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1176
        self.assertTrue(shibboleth.get_login_policy)
1177
        self.assertFalse(shibboleth.get_remove_policy)
1178
        self.assertFalse(local.get_add_policy)
1179
        self.assertFalse(local.get_add_policy)
1180
        self.assertFalse(google.get_add_policy)
1181

    
1182
        user2.groups.remove(Group.objects.get(name='academic-user'))
1183
        self.assertTrue(local.get_add_policy)
1184
        self.assertTrue(google.get_add_policy)
1185
        user2.groups.add(Group.objects.get(name='academic-user'))
1186

    
1187
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1188
                                                     user2,
1189
                                                     exclusive=True,
1190
                                                     add=True)
1191
        self.assertTrue(local.get_add_policy)
1192
        self.assertTrue(google.get_add_policy)
1193

    
1194
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1195
        self.assertFalse(local.get_automoderate_policy)
1196
        self.assertFalse(google.get_automoderate_policy)
1197
        self.assertTrue(shibboleth.get_automoderate_policy)
1198

    
1199
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1200
                  'GOOGLE_ADD_GROUPS_POLICY']:
1201
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1202

    
1203
    @shibboleth_settings(CREATE_POLICY=True)
1204
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1205
    def test_create_http(self):
1206
        # this should be wrapped inside a transaction
1207
        user = auth_functions.make_user(email="test@test.com")
1208
        provider = auth_providers.get_provider('shibboleth', user,
1209
                                               'test@academia.test')
1210
        provider.add_to_user()
1211
        user.get_auth_provider('shibboleth', 'test@academia.test')
1212
        provider = auth_providers.get_provider('local', user)
1213
        provider.add_to_user()
1214
        user.get_auth_provider('local')
1215

    
1216
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1217
        user = auth_functions.make_user("test2@test.com")
1218
        provider = auth_providers.get_provider('shibboleth', user,
1219
                                               'test@shibboleth.com',
1220
                                               **{'info': {'name':
1221
                                                           'User Test'}})
1222
        self.assertFalse(provider.get_create_policy)
1223
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1224
        self.assertTrue(provider.get_create_policy)
1225
        academic = provider.add_to_user()
1226

    
1227
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1228
    @shibboleth_settings(LIMIT_POLICY=2)
1229
    def test_policies(self):
1230
        user = get_local_user('kpap@synnefo.org')
1231
        user.add_auth_provider('shibboleth', identifier='1234')
1232
        user.add_auth_provider('shibboleth', identifier='12345')
1233

    
1234
        # default limit is 1
1235
        local = user.get_auth_provider('local')
1236
        self.assertEqual(local.get_add_policy, False)
1237

    
1238
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1239
        academic = user.get_auth_provider('shibboleth',
1240
                                          identifier='1234')
1241
        self.assertEqual(academic.get_add_policy, False)
1242
        newacademic = auth_providers.get_provider('shibboleth', user,
1243
                                                  identifier='123456')
1244
        self.assertEqual(newacademic.get_add_policy, True)
1245
        user.add_auth_provider('shibboleth', identifier='123456')
1246
        self.assertEqual(academic.get_add_policy, False)
1247
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1248

    
1249
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1250
    @shibboleth_settings(LIMIT_POLICY=2)
1251
    def test_messages(self):
1252
        user = get_local_user('kpap@synnefo.org')
1253
        user.add_auth_provider('shibboleth', identifier='1234')
1254
        user.add_auth_provider('shibboleth', identifier='12345')
1255
        provider = auth_providers.get_provider('shibboleth')
1256
        self.assertEqual(provider.get_message('title'), 'Academic')
1257
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1258
        # regenerate messages cache
1259
        provider = auth_providers.get_provider('shibboleth')
1260
        self.assertEqual(provider.get_message('title'), 'New title')
1261
        self.assertEqual(provider.get_message('login_title'),
1262
                         'New title LOGIN')
1263
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1264
        self.assertEqual(provider.get_module_icon,
1265
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1266
        self.assertEqual(provider.get_module_medium_icon,
1267
                         settings.MEDIA_URL +
1268
                         'im/auth/icons-medium/shibboleth.png')
1269

    
1270
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1271
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1272
        self.assertEqual(provider.get_method_details_msg,
1273
                         'Account: 12345')
1274
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1275
        self.assertEqual(provider.get_method_details_msg,
1276
                         'Account: 1234')
1277

    
1278
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1279
        self.assertEqual(provider.get_not_active_msg,
1280
                         "'Academic login' is disabled.")
1281

    
1282
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1283
    @shibboleth_settings(LIMIT_POLICY=2)
1284
    def test_templates(self):
1285
        user = get_local_user('kpap@synnefo.org')
1286
        user.add_auth_provider('shibboleth', identifier='1234')
1287
        user.add_auth_provider('shibboleth', identifier='12345')
1288

    
1289
        provider = auth_providers.get_provider('shibboleth')
1290
        self.assertEqual(provider.get_template('login'),
1291
                         'im/auth/shibboleth_login.html')
1292
        provider = auth_providers.get_provider('google')
1293
        self.assertEqual(provider.get_template('login'),
1294
                         'im/auth/generic_login.html')
1295

    
1296

    
1297
class TestActivationBackend(TestCase):
1298

    
1299
    def setUp(self):
1300
        # dummy call to pass through logging middleware
1301
        self.client.get(ui_url(''))
1302

    
1303
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1304
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1305
    def test_policies(self):
1306
        backend = activation_backends.get_backend()
1307

    
1308
        # email matches RE_USER_EMAIL_PATTERNS
1309
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1310
                               is_active=False, email_verified=False)
1311
        backend.handle_verification(user1, user1.verification_code)
1312
        self.assertEqual(user1.accepted_policy, 'email')
1313

    
1314
        # manually moderated
1315
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1316
                               is_active=False, email_verified=False)
1317

    
1318
        backend.handle_verification(user2, user2.verification_code)
1319
        self.assertEqual(user2.moderated, False)
1320
        backend.handle_moderation(user2)
1321
        self.assertEqual(user2.moderated, True)
1322
        self.assertEqual(user2.accepted_policy, 'manual')
1323

    
1324
        # autoaccept due to provider automoderate policy
1325
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1326
                               is_active=False, email_verified=False)
1327
        user3.auth_providers.all().delete()
1328
        user3.add_auth_provider('shibboleth', identifier='shib123')
1329
        backend.handle_verification(user3, user3.verification_code)
1330
        self.assertEqual(user3.moderated, True)
1331
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1332

    
1333
    @im_settings(MODERATION_ENABLED=False,
1334
                 MANAGERS=(('Manager',
1335
                            'manager@synnefo.org'),),
1336
                 HELPDESK=(('Helpdesk',
1337
                            'helpdesk@synnefo.org'),),
1338
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1339
    def test_without_moderation(self):
1340
        backend = activation_backends.get_backend()
1341
        form = backend.get_signup_form('local')
1342
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1343

    
1344
        user_data = {
1345
            'email': 'kpap@synnefo.org',
1346
            'first_name': 'Kostas Papas',
1347
            'password1': '123',
1348
            'password2': '123'
1349
        }
1350
        form = backend.get_signup_form('local', user_data)
1351
        user = form.create_user()
1352
        self.assertEqual(user.is_active, False)
1353
        self.assertEqual(user.email_verified, False)
1354

    
1355
        # step one, registration
1356
        result = backend.handle_registration(user)
1357
        user = AstakosUser.objects.get()
1358
        self.assertEqual(user.is_active, False)
1359
        self.assertEqual(user.email_verified, False)
1360
        self.assertTrue(user.verification_code)
1361
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1362
        backend.send_result_notifications(result, user)
1363
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1364
        self.assertEqual(len(mail.outbox), 1)
1365

    
1366
        # step two, verify email (automatically
1367
        # moderates/accepts user, since moderation is disabled)
1368
        user = AstakosUser.objects.get()
1369
        valid_code = user.verification_code
1370

    
1371
        # test invalid code
1372
        result = backend.handle_verification(user, valid_code)
1373
        backend.send_result_notifications(result, user)
1374
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1375
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1376
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1377
        # verification + activated + greeting = 3
1378
        self.assertEqual(len(mail.outbox), 3)
1379
        user = AstakosUser.objects.get()
1380
        self.assertEqual(user.is_active, True)
1381
        self.assertEqual(user.moderated, True)
1382
        self.assertTrue(user.moderated_at)
1383
        self.assertEqual(user.email_verified, True)
1384
        self.assertTrue(user.activation_sent)
1385

    
1386
    @im_settings(MODERATION_ENABLED=True,
1387
                 MANAGERS=(('Manager',
1388
                            'manager@synnefo.org'),),
1389
                 HELPDESK=(('Helpdesk',
1390
                            'helpdesk@synnefo.org'),),
1391
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1392
    def test_with_moderation(self):
1393

    
1394
        backend = activation_backends.get_backend()
1395
        form = backend.get_signup_form('local')
1396
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1397

    
1398
        user_data = {
1399
            'email': 'kpap@synnefo.org',
1400
            'first_name': 'Kostas Papas',
1401
            'password1': '123',
1402
            'password2': '123'
1403
        }
1404
        form = backend.get_signup_form(provider='local',
1405
                                       initial_data=user_data)
1406
        user = form.create_user()
1407
        self.assertEqual(user.is_active, False)
1408
        self.assertEqual(user.email_verified, False)
1409

    
1410
        # step one, registration
1411
        result = backend.handle_registration(user)
1412
        user = AstakosUser.objects.get()
1413
        self.assertEqual(user.is_active, False)
1414
        self.assertEqual(user.email_verified, False)
1415
        self.assertTrue(user.verification_code)
1416
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1417
        backend.send_result_notifications(result, user)
1418
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1419
        self.assertEqual(len(mail.outbox), 1)
1420

    
1421
        # step two, verifying email
1422
        user = AstakosUser.objects.get()
1423
        valid_code = user.verification_code
1424
        invalid_code = user.verification_code + 'invalid'
1425

    
1426
        # test invalid code
1427
        result = backend.handle_verification(user, invalid_code)
1428
        self.assertEqual(result.status, backend.Result.ERROR)
1429
        backend.send_result_notifications(result, user)
1430
        user = AstakosUser.objects.get()
1431
        self.assertEqual(user.is_active, False)
1432
        self.assertEqual(user.moderated, False)
1433
        self.assertEqual(user.moderated_at, None)
1434
        self.assertEqual(user.email_verified, False)
1435
        self.assertTrue(user.activation_sent)
1436

    
1437
        # test valid code
1438
        user = AstakosUser.objects.get()
1439
        result = backend.handle_verification(user, valid_code)
1440
        backend.send_result_notifications(result, user)
1441
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1442
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1443
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1444
        self.assertEqual(len(mail.outbox), 2)
1445
        user = AstakosUser.objects.get()
1446
        self.assertEqual(user.moderated, False)
1447
        self.assertEqual(user.moderated_at, None)
1448
        self.assertEqual(user.email_verified, True)
1449
        self.assertTrue(user.activation_sent)
1450

    
1451
        # test code reuse
1452
        result = backend.handle_verification(user, valid_code)
1453
        self.assertEqual(result.status, backend.Result.ERROR)
1454
        user = AstakosUser.objects.get()
1455
        self.assertEqual(user.is_active, False)
1456
        self.assertEqual(user.moderated, False)
1457
        self.assertEqual(user.moderated_at, None)
1458
        self.assertEqual(user.email_verified, True)
1459
        self.assertTrue(user.activation_sent)
1460

    
1461
        # valid code on verified user
1462
        user = AstakosUser.objects.get()
1463
        valid_code = user.verification_code
1464
        result = backend.handle_verification(user, valid_code)
1465
        self.assertEqual(result.status, backend.Result.ERROR)
1466

    
1467
        # step three, moderation user
1468
        user = AstakosUser.objects.get()
1469
        result = backend.handle_moderation(user)
1470
        backend.send_result_notifications(result, user)
1471

    
1472
        user = AstakosUser.objects.get()
1473
        self.assertEqual(user.is_active, True)
1474
        self.assertEqual(user.moderated, True)
1475
        self.assertTrue(user.moderated_at)
1476
        self.assertEqual(user.email_verified, True)
1477
        self.assertTrue(user.activation_sent)
1478

    
1479

    
1480
class TestWebloginRedirect(TestCase):
1481

    
1482
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1483
    def test_restricts_domains(self):
1484
        get_local_user('user1@synnefo.org')
1485

    
1486
        # next url construct helpers
1487
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1488
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1489
            urllib.quote_plus(nxt)
1490

    
1491
        # common cases
1492
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1493
        invalid_scheme = weblogin("customscheme://localhost")
1494
        invalid_scheme_with_valid_domain = \
1495
            weblogin("http://www.invaliddomain.com")
1496
        valid_scheme = weblogin("pithos://localhost/")
1497
        # to be used in assertRedirects
1498
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1499

    
1500
        # not authenticated, redirects to login which contains next param with
1501
        # additional nested quoted next params
1502
        r = self.client.get(valid_scheme, follow=True)
1503
        login_redirect = reverse('login') + '?next=' + \
1504
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1505
        self.assertRedirects(r, login_redirect)
1506

    
1507
        # authenticate client
1508
        self.client.login(username="user1@synnefo.org", password="password")
1509

    
1510
        # valid scheme
1511
        r = self.client.get(valid_scheme, follow=True)
1512
        url = r.redirect_chain[1][0]
1513
        # scheme preserved
1514
        self.assertTrue(url.startswith('pithos://localhost/'))
1515
        # redirect contains token param
1516
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1517
                                   scheme='https').query
1518
        params = urlparse.parse_qs(params)
1519
        self.assertEqual(params['token'][0],
1520
                         AstakosUser.objects.get().auth_token)
1521
        # does not contain uuid
1522
        # reverted for 0.14.2 to support old pithos desktop clients
1523
        #self.assertFalse('uuid' in params)
1524

    
1525
        # invalid cases
1526
        r = self.client.get(invalid_scheme, follow=True)
1527
        self.assertEqual(r.status_code, 403)
1528

    
1529
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1530
        self.assertEqual(r.status_code, 403)
1531

    
1532
        r = self.client.get(invalid_domain, follow=True)
1533
        self.assertEqual(r.status_code, 403)