Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 5bc77346

History | View | Annotate | Download (63 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True)
81
        token = PendingThirdPartyUser.objects.get().token
82
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
83
        self.assertEqual(r.status_code, 200)
84

    
85
        # a new pending user created
86
        pending_user = PendingThirdPartyUser.objects.get(
87
            third_party_identifier="kpapeppn")
88
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
89
        # keep the token for future use
90
        token = pending_user.token
91
        # from now on no shibboleth headers are sent to the server
92
        client.reset_tokens()
93

    
94
        # this is the old way, it should fail, to avoid pending user take over
95
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
96
        self.assertEqual(r.status_code, 404)
97

    
98
        # this is the signup unique url associated with the pending user
99
        # created
100
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
101
        identifier = pending_user.third_party_identifier
102
        post_data = {'third_party_identifier': identifier,
103
                     'first_name': 'Kostas',
104
                     'third_party_token': token,
105
                     'last_name': 'Mitroglou',
106
                     'provider': 'shibboleth'}
107

    
108
        signup_url = reverse('signup')
109

    
110
        # invlid email
111
        post_data['email'] = 'kpap'
112
        r = client.post(signup_url, post_data)
113
        self.assertContains(r, token)
114

    
115
        # existing email
116
        existing_user = get_local_user('test@test.com')
117
        post_data['email'] = 'test@test.com'
118
        r = client.post(signup_url, post_data)
119
        self.assertContains(r, messages.EMAIL_USED)
120
        existing_user.delete()
121

    
122
        # and finally a valid signup
123
        post_data['email'] = 'kpap@synnefo.org'
124
        r = client.post(signup_url, post_data, follow=True)
125
        self.assertContains(r, messages.VERIFICATION_SENT)
126

    
127
        # entires commited as expected
128
        self.assertEqual(AstakosUser.objects.count(), 1)
129
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
130
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
131

    
132
        # provider info stored
133
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
134
        self.assertEqual(provider.affiliation, 'Test Affiliation')
135
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
136
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
137
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
138
        self.assertTrue('headers' in provider.info)
139

    
140
        # login (not activated yet)
141
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
142
                          cn="Kostas Papadimitriou")
143
        r = client.get(ui_url("login/shibboleth?"), follow=True)
144
        self.assertContains(r, 'is pending moderation')
145

    
146
        # admin activates the user
147
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
148
        backend = activation_backends.get_backend()
149
        activation_result = backend.verify_user(u, u.verification_code)
150
        activation_result = backend.accept_user(u)
151
        self.assertFalse(activation_result.is_error())
152
        backend.send_result_notifications(activation_result, u)
153
        self.assertEqual(u.is_active, True)
154

    
155
        # we see our profile
156
        r = client.get(ui_url("login/shibboleth?"), follow=True)
157
        self.assertRedirects(r, ui_url('landing'))
158
        self.assertEqual(r.status_code, 200)
159

    
160
    def test_existing(self):
161
        """
162
        Test adding of third party login to an existing account
163
        """
164

    
165
        # this is our existing user
166
        existing_user = get_local_user('kpap@synnefo.org')
167
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
168
        existing_inactive.is_active = False
169
        existing_inactive.save()
170

    
171
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
172
        existing_unverified.is_active = False
173
        existing_unverified.activation_sent = None
174
        existing_unverified.email_verified = False
175
        existing_unverified.is_verified = False
176
        existing_unverified.save()
177

    
178
        client = ShibbolethClient()
179
        # shibboleth logged us in, notice that we use different email
180
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
181
                          cn="Kostas Papadimitriou", )
182
        r = client.get(ui_url("login/shibboleth?"), follow=True)
183

    
184
        # a new pending user created
185
        pending_user = PendingThirdPartyUser.objects.get()
186
        token = pending_user.token
187
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
188
        pending_key = pending_user.token
189
        client.reset_tokens()
190
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
191

    
192
        form = r.context['login_form']
193
        signupdata = copy.copy(form.initial)
194
        signupdata['email'] = 'kpap@synnefo.org'
195
        signupdata['third_party_token'] = token
196
        signupdata['provider'] = 'shibboleth'
197
        signupdata.pop('id', None)
198

    
199
        # the email exists to another user
200
        r = client.post(ui_url("signup"), signupdata)
201
        self.assertContains(r, "There is already an account with this email "
202
                               "address")
203
        # change the case, still cannot create
204
        signupdata['email'] = 'KPAP@synnefo.org'
205
        r = client.post(ui_url("signup"), signupdata)
206
        self.assertContains(r, "There is already an account with this email "
207
                               "address")
208
        # inactive user
209
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
210
        r = client.post(ui_url("signup"), signupdata)
211
        self.assertContains(r, "There is already an account with this email "
212
                               "address")
213

    
214
        # unverified user, this should pass, old entry will be deleted
215
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
216
        r = client.post(ui_url("signup"), signupdata)
217

    
218
        post_data = {'password': 'password',
219
                     'username': 'kpap@synnefo.org'}
220
        r = client.post(ui_url('local'), post_data, follow=True)
221
        self.assertTrue(r.context['request'].user.is_authenticated())
222
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
223
                          cn="Kostas Papadimitriou", )
224
        r = client.get(ui_url("login/shibboleth?"), follow=True)
225
        self.assertContains(r, "enabled for this account")
226
        client.reset_tokens()
227

    
228
        user = existing_user
229
        self.assertTrue(user.has_auth_provider('shibboleth'))
230
        self.assertTrue(user.has_auth_provider('local',
231
                                               auth_backend='astakos'))
232
        client.logout()
233

    
234
        # look Ma, i can login with both my shibboleth and local account
235
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
236
                          cn="Kostas Papadimitriou")
237
        r = client.get(ui_url("login/shibboleth?"), follow=True)
238
        self.assertTrue(r.context['request'].user.is_authenticated())
239
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
240
        self.assertRedirects(r, ui_url('landing'))
241
        self.assertEqual(r.status_code, 200)
242
        client.logout()
243
        client.reset_tokens()
244

    
245
        # logged out
246
        r = client.get(ui_url("profile"), follow=True)
247
        self.assertFalse(r.context['request'].user.is_authenticated())
248

    
249
        # login with local account also works
250
        post_data = {'password': 'password',
251
                     'username': 'kpap@synnefo.org'}
252
        r = self.client.post(ui_url('local'), post_data, follow=True)
253
        self.assertTrue(r.context['request'].user.is_authenticated())
254
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
255
        self.assertRedirects(r, ui_url('landing'))
256
        self.assertEqual(r.status_code, 200)
257

    
258
        # cannot add the same eppn
259
        client.set_tokens(mail="secondary@shibboleth.gr",
260
                          remote_user="kpapeppn",
261
                          cn="Kostas Papadimitriou", )
262
        r = client.get(ui_url("login/shibboleth?"), follow=True)
263
        self.assertRedirects(r, ui_url('landing'))
264
        self.assertTrue(r.status_code, 200)
265
        self.assertEquals(existing_user.auth_providers.count(), 2)
266

    
267
        # only one allowed by default
268
        client.set_tokens(mail="secondary@shibboleth.gr",
269
                          remote_user="kpapeppn2",
270
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
271
        prov = auth_providers.get_provider('shibboleth')
272
        r = client.get(ui_url("login/shibboleth?"), follow=True)
273
        self.assertContains(r, "Failed to add")
274
        self.assertRedirects(r, ui_url('profile'))
275
        self.assertTrue(r.status_code, 200)
276
        self.assertEquals(existing_user.auth_providers.count(), 2)
277
        client.logout()
278
        client.reset_tokens()
279

    
280
        # cannot login with another eppn
281
        client.set_tokens(mail="kpap@synnefo.org",
282
                          remote_user="kpapeppninvalid",
283
                          cn="Kostas Papadimitriou")
284
        r = client.get(ui_url("login/shibboleth?"), follow=True)
285
        self.assertFalse(r.context['request'].user.is_authenticated())
286

    
287
        # cannot
288

    
289
        # lets remove local password
290
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
291
                                       email="kpap@synnefo.org")
292
        remove_local_url = user.get_auth_provider('local').get_remove_url
293
        remove_shibbo_url = user.get_auth_provider('shibboleth',
294
                                                   'kpapeppn').get_remove_url
295
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
296
                          cn="Kostas Papadimtriou")
297
        r = client.get(ui_url("login/shibboleth?"), follow=True)
298
        client.reset_tokens()
299

    
300
        # only POST is allowed (for CSRF protection)
301
        r = client.get(remove_local_url, follow=True)
302
        self.assertEqual(r.status_code, 405)
303

    
304
        r = client.post(remove_local_url, follow=True)
305
        # 2 providers left
306
        self.assertEqual(user.auth_providers.count(), 1)
307
        # cannot remove last provider
308
        r = client.post(remove_shibbo_url)
309
        self.assertEqual(r.status_code, 403)
310
        self.client.logout()
311

    
312
        # cannot login using local credentials (notice we use another client)
313
        post_data = {'password': 'password',
314
                     'username': 'kpap@synnefo.org'}
315
        r = self.client.post(ui_url('local'), post_data, follow=True)
316
        self.assertFalse(r.context['request'].user.is_authenticated())
317

    
318
        # we can reenable the local provider by setting a password
319
        r = client.get(ui_url("password_change"), follow=True)
320
        r = client.post(ui_url("password_change"), {'new_password1': '111',
321
                                                    'new_password2': '111'},
322
                        follow=True)
323
        user = r.context['request'].user
324
        self.assertTrue(user.has_auth_provider('local'))
325
        self.assertTrue(user.has_auth_provider('shibboleth'))
326
        self.assertTrue(user.check_password('111'))
327
        self.assertTrue(user.has_usable_password())
328

    
329
        # change password via profile form
330
        r = client.post(ui_url("profile"), {
331
            'old_password': '111',
332
            'new_password': '',
333
            'new_password2': '',
334
            'change_password': 'on',
335
        }, follow=False)
336
        self.assertEqual(r.status_code, 200)
337
        self.assertFalse(r.context['profile_form'].is_valid())
338

    
339
        self.client.logout()
340

    
341
        # now we can login
342
        post_data = {'password': '111',
343
                     'username': 'kpap@synnefo.org'}
344
        r = self.client.post(ui_url('local'), post_data, follow=True)
345
        self.assertTrue(r.context['request'].user.is_authenticated())
346

    
347
        client.reset_tokens()
348

    
349
        # we cannot take over another shibboleth identifier
350
        user2 = get_local_user('another@synnefo.org')
351
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
352
        # login
353
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
354
                          cn="Kostas Papadimitriou")
355
        r = client.get(ui_url("login/shibboleth?"), follow=True)
356
        # try to assign existing shibboleth identifier of another user
357
        client.set_tokens(mail="kpap_second@shibboleth.gr",
358
                          remote_user="existingeppn",
359
                          cn="Kostas Papadimitriou")
360
        r = client.get(ui_url("login/shibboleth?"), follow=True)
361
        self.assertContains(r, "is already in use")
362

    
363

    
364
class TestLocal(TestCase):
365

    
366
    def setUp(self):
367
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
368
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
369
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
370
        settings.ASTAKOS_MODERATION_ENABLED = True
371

    
372
    def tearDown(self):
373
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
374
        AstakosUser.objects.all().delete()
375

    
376
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
377
    def test_login_ratelimit(self):
378
        from django.core.cache import cache
379
        cache.clear()
380

    
381
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
382
        r = self.client.post(ui_url('local'), credentials, follow=True)
383
        fields = r.context['login_form'].fields.keyOrder
384
        self.assertFalse('recaptcha_challenge_field' in fields)
385
        r = self.client.post(ui_url('local'), credentials, follow=True)
386
        fields = r.context['login_form'].fields.keyOrder
387
        self.assertFalse('recaptcha_challenge_field' in fields)
388
        r = self.client.post(ui_url('local'), credentials, follow=True)
389
        fields = r.context['login_form'].fields.keyOrder
390
        self.assertTrue('recaptcha_challenge_field' in fields)
391
        r = self.client.post(ui_url('local'), follow=True)
392
        fields = r.context['login_form'].fields.keyOrder
393
        self.assertTrue('recaptcha_challenge_field' in fields)
394

    
395
    def test_no_moderation(self):
396
        # disable moderation
397
        astakos_settings.MODERATION_ENABLED = False
398

    
399
        # create a new user
400
        r = self.client.get(ui_url("signup"))
401
        self.assertEqual(r.status_code, 200)
402
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
403
                'password2': 'password', 'first_name': 'Kostas',
404
                'last_name': 'Mitroglou', 'provider': 'local'}
405
        r = self.client.post(ui_url("signup"), data)
406

    
407
        # user created
408
        self.assertEqual(AstakosUser.objects.count(), 1)
409
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
410
                                       email="kpap@synnefo.org")
411
        self.assertEqual(user.username, 'kpap@synnefo.org')
412
        self.assertEqual(user.has_auth_provider('local'), True)
413
        self.assertFalse(user.is_active)
414

    
415
        # user (but not admin) gets notified
416
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
417
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
418
        astakos_settings.MODERATION_ENABLED = True
419

    
420
    def test_email_case(self):
421
        data = {
422
            'email': 'kPap@synnefo.org',
423
            'password1': '1234',
424
            'password2': '1234'
425
        }
426

    
427
        form = forms.LocalUserCreationForm(data)
428
        self.assertTrue(form.is_valid())
429
        user = form.save()
430
        form.store_user(user, {})
431

    
432
        u = AstakosUser.objects.get()
433
        self.assertEqual(u.email, 'kPap@synnefo.org')
434
        self.assertEqual(u.username, 'kpap@synnefo.org')
435
        u.is_active = True
436
        u.email_verified = True
437
        u.save()
438

    
439
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
440
        login = forms.LoginForm(data=data)
441
        self.assertTrue(login.is_valid())
442

    
443
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
444
        login = forms.LoginForm(data=data)
445
        self.assertTrue(login.is_valid())
446

    
447
        data = {
448
            'email': 'kpap@synnefo.org',
449
            'password1': '1234',
450
            'password2': '1234'
451
        }
452
        form = forms.LocalUserCreationForm(data)
453
        self.assertFalse(form.is_valid())
454

    
455
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
456
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
457
    def test_local_provider(self):
458
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
459

    
460
        # create a user
461
        r = self.client.get(ui_url("signup"))
462
        self.assertEqual(r.status_code, 200)
463
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
464
                'password2': 'password', 'first_name': 'Kostas',
465
                'last_name': 'Mitroglou', 'provider': 'local'}
466
        r = self.client.post(ui_url("signup"), data)
467

    
468
        # user created
469
        self.assertEqual(AstakosUser.objects.count(), 1)
470
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
471
                                       email="kpap@synnefo.org")
472
        self.assertEqual(user.username, 'kpap@synnefo.org')
473
        self.assertEqual(user.has_auth_provider('local'), True)
474
        self.assertFalse(user.is_active)  # not activated
475
        self.assertFalse(user.email_verified)  # not verified
476
        self.assertTrue(user.activation_sent)  # activation automatically sent
477
        self.assertFalse(user.moderated)
478
        self.assertFalse(user.email_verified)
479

    
480
        # admin gets notified and activates the user from the command line
481
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
482
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
483
                                               'password': 'password'},
484
                             follow=True)
485
        self.assertContains(r, messages.VERIFICATION_SENT)
486
        backend = activation_backends.get_backend()
487

    
488
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
489
        backend.send_user_verification_email(user)
490

    
491
        # user activation fields updated and user gets notified via email
492
        user = AstakosUser.objects.get(pk=user.pk)
493
        self.assertTrue(user.activation_sent)
494
        self.assertFalse(user.email_verified)
495
        self.assertFalse(user.is_active)
496
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
497

    
498
        # user forgot she got registered and tries to submit registration
499
        # form. Notice the upper case in email
500
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
501
                'password2': 'password', 'first_name': 'Kostas',
502
                'last_name': 'Mitroglou', 'provider': 'local'}
503
        r = self.client.post(ui_url("signup"), data, follow=True)
504
        self.assertRedirects(r, reverse('login'))
505
        self.assertContains(r, messages.VERIFICATION_SENT)
506

    
507
        user = AstakosUser.objects.get()
508
        # previous user replaced
509
        self.assertTrue(user.activation_sent)
510
        self.assertFalse(user.email_verified)
511
        self.assertFalse(user.is_active)
512
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
513

    
514
        # hmmm, email exists; lets request a password change
515
        r = self.client.get(ui_url('local/password_reset'))
516
        self.assertEqual(r.status_code, 200)
517
        data = {'email': 'kpap@synnefo.org'}
518
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
519
        # she can't because account is not active yet
520
        self.assertContains(r, 'pending activation')
521

    
522
        # moderation is enabled and an activation email has already been sent
523
        # so user can trigger resend of the activation email
524
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
525
                            follow=True)
526
        self.assertContains(r, 'has been sent to your email address.')
527
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
528

    
529
        # also she cannot login
530
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
531
        r = self.client.post(ui_url('local'), data, follow=True)
532
        self.assertContains(r, 'Resend activation')
533
        self.assertFalse(r.context['request'].user.is_authenticated())
534
        self.assertFalse('_pithos2_a' in self.client.cookies)
535

    
536
        # user sees the message and resends activation
537
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
538
                            follow=True)
539
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
540

    
541
        # logged in user cannot activate another account
542
        tmp_user = get_local_user("test_existing_user@synnefo.org")
543
        tmp_client = Client()
544
        tmp_client.login(username="test_existing_user@synnefo.org",
545
                         password="password")
546
        r = tmp_client.get(user.get_activation_url(), follow=True)
547
        self.assertContains(r, messages.LOGGED_IN_WARNING)
548

    
549
        r = self.client.get(user.get_activation_url(), follow=True)
550
        # previous code got invalidated
551
        self.assertEqual(r.status_code, 404)
552

    
553
        user = AstakosUser.objects.get(pk=user.pk)
554
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
555
        r = self.client.get(user.get_activation_url(), follow=True)
556
        self.assertRedirects(r, reverse('login'))
557
        # user sees that account is pending approval from admins
558
        self.assertContains(r, messages.NOTIFICATION_SENT)
559
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
560

    
561
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
562
        result = backend.handle_moderation(user)
563
        backend.send_result_notifications(result, user)
564
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
565
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
566

    
567
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
568
        r = self.client.get(ui_url('profile'), follow=True)
569
        self.assertFalse(r.context['request'].user.is_authenticated())
570
        self.assertFalse('_pithos2_a' in self.client.cookies)
571
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
572

    
573
        user = AstakosUser.objects.get(pk=user.pk)
574
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
575
                                               'password': 'password'},
576
                             follow=True)
577
        # user activated and logged in, token cookie set
578
        self.assertTrue(r.context['request'].user.is_authenticated())
579
        self.assertTrue('_pithos2_a' in self.client.cookies)
580
        cookies = self.client.cookies
581
        self.assertTrue(quote(user.auth_token) in
582
                        cookies.get('_pithos2_a').value)
583
        r = self.client.get(ui_url('logout'), follow=True)
584
        r = self.client.get(ui_url(''), follow=True)
585
        self.assertRedirects(r, ui_url('login'))
586
        # user logged out, token cookie removed
587
        self.assertFalse(r.context['request'].user.is_authenticated())
588
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
589

    
590
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
591
        del self.client.cookies['_pithos2_a']
592

    
593
        # user can login
594
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
595
                                               'password': 'password'},
596
                             follow=True)
597
        self.assertTrue(r.context['request'].user.is_authenticated())
598
        self.assertTrue('_pithos2_a' in self.client.cookies)
599
        cookies = self.client.cookies
600
        self.assertTrue(quote(user.auth_token) in
601
                        cookies.get('_pithos2_a').value)
602
        self.client.get(ui_url('logout'), follow=True)
603

    
604
        # user forgot password
605
        old_pass = user.password
606
        r = self.client.get(ui_url('local/password_reset'))
607
        self.assertEqual(r.status_code, 200)
608
        r = self.client.post(ui_url('local/password_reset'),
609
                             {'email': 'kpap@synnefo.org'})
610
        self.assertEqual(r.status_code, 302)
611
        # email sent
612
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
613

    
614
        # user visits change password link
615
        user = AstakosUser.objects.get(pk=user.pk)
616
        r = self.client.get(user.get_password_reset_url())
617
        r = self.client.post(user.get_password_reset_url(),
618
                             {'new_password1': 'newpass',
619
                              'new_password2': 'newpass'})
620

    
621
        user = AstakosUser.objects.get(pk=user.pk)
622
        self.assertNotEqual(old_pass, user.password)
623

    
624
        # old pass is not usable
625
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
626
                                               'password': 'password'})
627
        self.assertContains(r, 'Please enter a correct username and password')
628
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
629
                                               'password': 'newpass'},
630
                             follow=True)
631
        self.assertTrue(r.context['request'].user.is_authenticated())
632
        self.client.logout()
633

    
634
        # tests of special local backends
635
        user = AstakosUser.objects.get(pk=user.pk)
636
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
637
        user.save()
638

    
639
        # non astakos local backends do not support password reset
640
        r = self.client.get(ui_url('local/password_reset'))
641
        self.assertEqual(r.status_code, 200)
642
        r = self.client.post(ui_url('local/password_reset'),
643
                             {'email': 'kpap@synnefo.org'})
644
        # she can't because account is not active yet
645
        self.assertContains(r, "Changing password is not")
646

    
647

    
648
class UserActionsTests(TestCase):
649

    
650
    def test_email_change(self):
651
        # to test existing email validation
652
        get_local_user('existing@synnefo.org')
653

    
654
        # local user
655
        user = get_local_user('kpap@synnefo.org')
656

    
657
        # login as kpap
658
        self.client.login(username='kpap@synnefo.org', password='password')
659
        r = self.client.get(ui_url('profile'), follow=True)
660
        user = r.context['request'].user
661
        self.assertTrue(user.is_authenticated())
662

    
663
        # change email is enabled
664
        r = self.client.get(ui_url('email_change'))
665
        self.assertEqual(r.status_code, 200)
666
        self.assertFalse(user.email_change_is_pending())
667

    
668
        # request email change to an existing email fails
669
        data = {'new_email_address': 'existing@synnefo.org'}
670
        r = self.client.post(ui_url('email_change'), data)
671
        self.assertContains(r, messages.EMAIL_USED)
672

    
673
        # proper email change
674
        data = {'new_email_address': 'kpap@gmail.com'}
675
        r = self.client.post(ui_url('email_change'), data, follow=True)
676
        self.assertRedirects(r, ui_url('profile'))
677
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
678
        change1 = EmailChange.objects.get()
679

    
680
        # user sees a warning
681
        r = self.client.get(ui_url('email_change'))
682
        self.assertEqual(r.status_code, 200)
683
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
684
        self.assertTrue(user.email_change_is_pending())
685

    
686
        # link was sent
687
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
688
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
689

    
690
        # proper email change
691
        data = {'new_email_address': 'kpap@yahoo.com'}
692
        r = self.client.post(ui_url('email_change'), data, follow=True)
693
        self.assertRedirects(r, ui_url('profile'))
694
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
695
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
696
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
697
        change2 = EmailChange.objects.get()
698

    
699
        r = self.client.get(change1.get_url())
700
        self.assertEquals(r.status_code, 404)
701
        self.client.logout()
702

    
703
        invalid_client = Client()
704
        r = invalid_client.post(ui_url('local?'),
705
                                {'username': 'existing@synnefo.org',
706
                                 'password': 'password'})
707
        r = invalid_client.get(change2.get_url(), follow=True)
708
        self.assertEquals(r.status_code, 403)
709

    
710
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
711
                             {'username': 'kpap@synnefo.org',
712
                              'password': 'password',
713
                              'next': change2.get_url()},
714
                             follow=True)
715
        self.assertRedirects(r, ui_url('profile'))
716
        user = r.context['request'].user
717
        self.assertEquals(user.email, 'kpap@yahoo.com')
718
        self.assertEquals(user.username, 'kpap@yahoo.com')
719

    
720
        self.client.logout()
721
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
722
                             {'username': 'kpap@synnefo.org',
723
                              'password': 'password',
724
                              'next': change2.get_url()},
725
                             follow=True)
726
        self.assertContains(r, "Please enter a correct username and password")
727
        self.assertEqual(user.emailchanges.count(), 0)
728

    
729
        AstakosUser.objects.all().delete()
730
        Group.objects.all().delete()
731

    
732

    
733
TEST_TARGETED_ID1 = \
734
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
735
TEST_TARGETED_ID2 = \
736
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
737
TEST_TARGETED_ID3 = \
738
    "https://idp.synnefo.org/idp/shibboleth!"
739

    
740

    
741
class TestAuthProviderViews(TestCase):
742

    
743
    def tearDown(self):
744
        AstakosUser.objects.all().delete()
745

    
746
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
747
                 SHIBBOLETH_MIGRATE_EPPN=True)
748
    def migrate_to_remote_id(self):
749
        eppn_user = get_local_user("eppn@synnefo.org")
750
        tid_user = get_local_user("tid@synnefo.org")
751
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
752
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
753

    
754
        get_user = lambda r: r.context['request'].user
755

    
756
        client = ShibbolethClient()
757
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
758
        r = client.get(ui_url('login/shibboleth?'), follow=True)
759
        self.assertTrue(get_user(r).is_authenticated())
760
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
761
                         TEST_TARGETED_ID2)
762

    
763
        client = ShibbolethClient()
764
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
765
        r = client.get(ui_url('login/shibboleth?'), follow=True)
766
        self.assertTrue(get_user(r).is_authenticated())
767
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
768
                         TEST_TARGETED_ID1)
769

    
770
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
771
                         AUTOMODERATE_POLICY=True)
772
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
773
                 FORCE_PROFILE_UPDATE=False)
774
    def test_user(self):
775
        Profile = AuthProviderPolicyProfile
776
        Pending = PendingThirdPartyUser
777
        User = AstakosUser
778

    
779
        User.objects.create(email="newuser@synnefo.org")
780
        get_local_user("olduser@synnefo.org")
781
        cl_olduser = ShibbolethClient()
782
        get_local_user("olduser2@synnefo.org")
783
        ShibbolethClient()
784
        cl_newuser = ShibbolethClient()
785
        cl_newuser2 = Client()
786

    
787
        academic_group, created = Group.objects.get_or_create(
788
            name='academic-login')
789
        academic_users = academic_group.user_set
790
        assert created
791
        policy_only_academic = Profile.objects.add_policy('academic_strict',
792
                                                          'shibboleth',
793
                                                          academic_group,
794
                                                          exclusive=True,
795
                                                          login=False,
796
                                                          add=False)
797

    
798
        # new academic user
799
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
800
        cl_newuser.set_tokens(remote_user="newusereppn",
801
                              mail="newuser@synnefo.org",
802
                              surname="Lastname")
803
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
804
        initial = r.context['signup_form'].initial
805
        pending = Pending.objects.get()
806
        self.assertEqual(initial.get('last_name'), 'Lastname')
807
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
808
        identifier = pending.third_party_identifier
809
        signup_data = {'third_party_identifier': identifier,
810
                       'first_name': 'Academic',
811
                       'third_party_token': pending.token,
812
                       'last_name': 'New User',
813
                       'provider': 'shibboleth'}
814
        r = cl_newuser.post(ui_url('signup'), signup_data)
815
        self.assertContains(r, "This field is required", )
816
        signup_data['email'] = 'olduser@synnefo.org'
817
        r = cl_newuser.post(ui_url('signup'), signup_data)
818
        self.assertContains(r, "already an account with this email", )
819
        signup_data['email'] = 'newuser@synnefo.org'
820
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
821
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
822
        self.assertEqual(r.status_code, 404)
823
        newuser = User.objects.get(email="newuser@synnefo.org")
824
        activation_link = newuser.get_activation_url()
825
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
826

    
827
        # new non-academic user
828
        signup_data = {'first_name': 'Non Academic',
829
                       'last_name': 'New User',
830
                       'provider': 'local',
831
                       'password1': 'password',
832
                       'password2': 'password'}
833
        signup_data['email'] = 'olduser@synnefo.org'
834
        r = cl_newuser2.post(ui_url('signup'), signup_data)
835
        self.assertContains(r, 'There is already an account with this '
836
                               'email address')
837
        signup_data['email'] = 'newuser@synnefo.org'
838
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
839
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
840
        r = self.client.get(activation_link, follow=True)
841
        self.assertEqual(r.status_code, 404)
842
        newuser = User.objects.get(email="newuser@synnefo.org")
843
        self.assertTrue(newuser.activation_sent)
844

    
845
        # activation sent, user didn't open verification url so additional
846
        # registrations invalidate the previous signups.
847
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
848
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
849
        pending = Pending.objects.get()
850
        identifier = pending.third_party_identifier
851
        signup_data = {'third_party_identifier': identifier,
852
                       'first_name': 'Academic',
853
                       'third_party_token': pending.token,
854
                       'last_name': 'New User',
855
                       'provider': 'shibboleth'}
856
        signup_data['email'] = 'newuser@synnefo.org'
857
        r = cl_newuser.post(ui_url('signup'), signup_data)
858
        self.assertEqual(r.status_code, 302)
859
        newuser = User.objects.get(email="newuser@synnefo.org")
860
        self.assertTrue(newuser.activation_sent)
861
        activation_link = newuser.get_activation_url()
862
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
863
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
864
        self.assertRedirects(r, ui_url('landing'))
865
        newuser = User.objects.get(email="newuser@synnefo.org")
866
        self.assertEqual(newuser.is_active, True)
867
        self.assertEqual(newuser.email_verified, True)
868
        cl_newuser.logout()
869

    
870
        # cannot reactivate if suspended
871
        newuser.is_active = False
872
        newuser.save()
873
        r = cl_newuser.get(newuser.get_activation_url())
874
        newuser = User.objects.get(email="newuser@synnefo.org")
875
        self.assertFalse(newuser.is_active)
876

    
877
        # release suspension
878
        newuser.is_active = True
879
        newuser.save()
880

    
881
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
882
        local = auth.get_provider('local', newuser)
883
        self.assertEqual(local.get_add_policy, False)
884
        self.assertEqual(local.get_login_policy, False)
885
        r = cl_newuser.get(local.get_add_url, follow=True)
886
        self.assertRedirects(r, ui_url('profile'))
887
        self.assertContains(r, 'disabled for your')
888

    
889
        cl_olduser.login(username='olduser@synnefo.org', password="password")
890
        r = cl_olduser.get(ui_url('profile'), follow=True)
891
        self.assertEqual(r.status_code, 200)
892
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
893
        self.assertContains(r, 'Your request is missing a unique token')
894
        cl_olduser.set_tokens(remote_user="newusereppn")
895
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
896
        self.assertContains(r, 'already in use')
897
        cl_olduser.set_tokens(remote_user="oldusereppn")
898
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
899
        self.assertContains(r, 'Academic login enabled for this account')
900

    
901
        user = User.objects.get(email="olduser@synnefo.org")
902
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
903
        local_provider = user.get_auth_provider('local')
904
        self.assertEqual(shib_provider.get_remove_policy, True)
905
        self.assertEqual(local_provider.get_remove_policy, True)
906

    
907
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
908
                                                          'shibboleth',
909
                                                          academic_group,
910
                                                          remove=False)
911
        user.groups.add(academic_group)
912
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
913
        local_provider = user.get_auth_provider('local')
914
        self.assertEqual(shib_provider.get_remove_policy, False)
915
        self.assertEqual(local_provider.get_remove_policy, True)
916
        self.assertEqual(local_provider.get_login_policy, False)
917

    
918
        cl_olduser.logout()
919
        login_data = {'username': 'olduser@synnefo.org',
920
                      'password': 'password'}
921
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
922
        self.assertContains(r, "login/shibboleth'>Academic login")
923
        Group.objects.all().delete()
924

    
925

    
926
class TestAuthProvidersAPI(TestCase):
927
    """
928
    Test auth_providers module API
929
    """
930

    
931
    def tearDown(self):
932
        Group.objects.all().delete()
933

    
934
    @im_settings(IM_MODULES=['local', 'shibboleth'])
935
    def test_create(self):
936
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
937
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
938

    
939
        module = 'shibboleth'
940
        identifier = 'SHIB_UUID'
941
        provider_params = {
942
            'affiliation': 'UNIVERSITY',
943
            'info': {'age': 27}
944
        }
945
        provider = auth.get_provider(module, user2, identifier,
946
                                     **provider_params)
947
        provider.add_to_user()
948
        provider = auth.get_provider(module, user, identifier,
949
                                     **provider_params)
950
        provider.add_to_user()
951
        user.email_verified = True
952
        user.save()
953
        self.assertRaises(Exception, provider.add_to_user)
954
        provider = user.get_auth_provider(module, identifier)
955
        self.assertEqual(user.get_auth_provider(
956
            module, identifier)._instance.info.get('age'), 27)
957

    
958
        module = 'local'
959
        identifier = None
960
        provider_params = {'auth_backend': 'ldap', 'info':
961
                          {'office': 'A1'}}
962
        provider = auth.get_provider(module, user, identifier,
963
                                     **provider_params)
964
        provider.add_to_user()
965
        self.assertFalse(provider.get_add_policy)
966
        self.assertRaises(Exception, provider.add_to_user)
967

    
968
        shib = user.get_auth_provider('shibboleth',
969
                                      'SHIB_UUID')
970
        self.assertTrue(shib.get_remove_policy)
971

    
972
        local = user.get_auth_provider('local')
973
        self.assertTrue(local.get_remove_policy)
974

    
975
        local.remove_from_user()
976
        self.assertFalse(shib.get_remove_policy)
977
        self.assertRaises(Exception, shib.remove_from_user)
978

    
979
        provider = user.get_auth_providers()[0]
980
        self.assertRaises(Exception, provider.add_to_user)
981

    
982
    @im_settings(IM_MODULES=['local', 'shibboleth'])
983
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
984
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
985
                                                 'group2'])
986
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
987
                        CREATION_GROUPS_POLICY=['localgroup-create',
988
                                                'group-create'])
989
    def test_add_groups(self):
990
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
991
        provider = auth.get_provider('shibboleth', user, 'test123')
992
        provider.add_to_user()
993
        user = AstakosUser.objects.get()
994
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
995
                         sorted([u'group1', u'group2', u'group-create']))
996

    
997
        local = auth.get_provider('local', user)
998
        local.add_to_user()
999
        provider = user.get_auth_provider('shibboleth')
1000
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1001
        provider.remove_from_user()
1002
        user = AstakosUser.objects.get()
1003
        self.assertEqual(len(user.get_auth_providers()), 1)
1004
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1005
                         sorted([u'group-create', u'localgroup']))
1006

    
1007
        local = user.get_auth_provider('local')
1008
        self.assertRaises(Exception, local.remove_from_user)
1009
        provider = auth.get_provider('shibboleth', user, 'test123')
1010
        provider.add_to_user()
1011
        user = AstakosUser.objects.get()
1012
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1013
                         sorted([u'group-create', u'group1', u'group2',
1014
                                 u'localgroup']))
1015
        Group.objects.all().delete()
1016

    
1017
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1018
    def test_policies(self):
1019
        group_old, created = Group.objects.get_or_create(name='olduser')
1020

    
1021
        astakos_settings.MODERATION_ENABLED = True
1022
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1023
            ['academic-user']
1024
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1025
            ['google-user']
1026

    
1027
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1028
        user.groups.add(group_old)
1029
        user.add_auth_provider('local')
1030

    
1031
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
1032
        user2.add_auth_provider('shibboleth', identifier='shibid')
1033

    
1034
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
1035
        user3.groups.add(group_old)
1036
        user3.add_auth_provider('local')
1037
        user3.add_auth_provider('shibboleth', identifier='1234')
1038

    
1039
        self.assertTrue(user2.groups.get(name='academic-user'))
1040
        self.assertFalse(user2.groups.filter(name='olduser').count())
1041

    
1042
        local = auth_providers.get_provider('local')
1043
        self.assertTrue(local.get_add_policy)
1044

    
1045
        academic_group = Group.objects.get(name='academic-user')
1046
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1047
                                                     academic_group,
1048
                                                     exclusive=True,
1049
                                                     add=False,
1050
                                                     login=False)
1051
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1052
                                                     academic_group,
1053
                                                     exclusive=True,
1054
                                                     login=False,
1055
                                                     add=False)
1056
        # no duplicate entry gets created
1057
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1058

    
1059
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1060
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1061
                                                     user2,
1062
                                                     remove=False)
1063
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1064

    
1065
        local = auth_providers.get_provider('local', user2)
1066
        google = auth_providers.get_provider('google', user2)
1067
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1068
        self.assertTrue(shibboleth.get_login_policy)
1069
        self.assertFalse(shibboleth.get_remove_policy)
1070
        self.assertFalse(local.get_add_policy)
1071
        self.assertFalse(local.get_add_policy)
1072
        self.assertFalse(google.get_add_policy)
1073

    
1074
        user2.groups.remove(Group.objects.get(name='academic-user'))
1075
        self.assertTrue(local.get_add_policy)
1076
        self.assertTrue(google.get_add_policy)
1077
        user2.groups.add(Group.objects.get(name='academic-user'))
1078

    
1079
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1080
                                                     user2,
1081
                                                     exclusive=True,
1082
                                                     add=True)
1083
        self.assertTrue(local.get_add_policy)
1084
        self.assertTrue(google.get_add_policy)
1085

    
1086
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1087
        self.assertFalse(local.get_automoderate_policy)
1088
        self.assertFalse(google.get_automoderate_policy)
1089
        self.assertTrue(shibboleth.get_automoderate_policy)
1090

    
1091
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1092
                  'GOOGLE_ADD_GROUPS_POLICY']:
1093
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1094

    
1095
    @shibboleth_settings(CREATE_POLICY=True)
1096
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1097
    def test_create_http(self):
1098
        # this should be wrapped inside a transaction
1099
        user = AstakosUser(email="test@test.com")
1100
        user.save()
1101
        provider = auth_providers.get_provider('shibboleth', user,
1102
                                               'test@academia.test')
1103
        provider.add_to_user()
1104
        user.get_auth_provider('shibboleth', 'test@academia.test')
1105
        provider = auth_providers.get_provider('local', user)
1106
        provider.add_to_user()
1107
        user.get_auth_provider('local')
1108

    
1109
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1110
        user = AstakosUser(email="test2@test.com")
1111
        user.save()
1112
        provider = auth_providers.get_provider('shibboleth', user,
1113
                                               'test@shibboleth.com',
1114
                                               **{'info': {'name':
1115
                                                           'User Test'}})
1116
        self.assertFalse(provider.get_create_policy)
1117
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1118
        self.assertTrue(provider.get_create_policy)
1119
        academic = provider.add_to_user()
1120

    
1121
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1122
    @shibboleth_settings(LIMIT_POLICY=2)
1123
    def test_policies(self):
1124
        user = get_local_user('kpap@synnefo.org')
1125
        user.add_auth_provider('shibboleth', identifier='1234')
1126
        user.add_auth_provider('shibboleth', identifier='12345')
1127

    
1128
        # default limit is 1
1129
        local = user.get_auth_provider('local')
1130
        self.assertEqual(local.get_add_policy, False)
1131

    
1132
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1133
        academic = user.get_auth_provider('shibboleth',
1134
                                          identifier='1234')
1135
        self.assertEqual(academic.get_add_policy, False)
1136
        newacademic = auth_providers.get_provider('shibboleth', user,
1137
                                                  identifier='123456')
1138
        self.assertEqual(newacademic.get_add_policy, True)
1139
        user.add_auth_provider('shibboleth', identifier='123456')
1140
        self.assertEqual(academic.get_add_policy, False)
1141
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1142

    
1143
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1144
    @shibboleth_settings(LIMIT_POLICY=2)
1145
    def test_messages(self):
1146
        user = get_local_user('kpap@synnefo.org')
1147
        user.add_auth_provider('shibboleth', identifier='1234')
1148
        user.add_auth_provider('shibboleth', identifier='12345')
1149
        provider = auth_providers.get_provider('shibboleth')
1150
        self.assertEqual(provider.get_message('title'), 'Academic')
1151
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1152
        # regenerate messages cache
1153
        provider = auth_providers.get_provider('shibboleth')
1154
        self.assertEqual(provider.get_message('title'), 'New title')
1155
        self.assertEqual(provider.get_message('login_title'),
1156
                         'New title LOGIN')
1157
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1158
        self.assertEqual(provider.get_module_icon,
1159
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1160
        self.assertEqual(provider.get_module_medium_icon,
1161
                         settings.MEDIA_URL +
1162
                         'im/auth/icons-medium/shibboleth.png')
1163

    
1164
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1165
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1166
        self.assertEqual(provider.get_method_details_msg,
1167
                         'Account: 12345')
1168
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1169
        self.assertEqual(provider.get_method_details_msg,
1170
                         'Account: 1234')
1171

    
1172
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1173
        self.assertEqual(provider.get_not_active_msg,
1174
                         "'Academic login' is disabled.")
1175

    
1176
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1177
    @shibboleth_settings(LIMIT_POLICY=2)
1178
    def test_templates(self):
1179
        user = get_local_user('kpap@synnefo.org')
1180
        user.add_auth_provider('shibboleth', identifier='1234')
1181
        user.add_auth_provider('shibboleth', identifier='12345')
1182

    
1183
        provider = auth_providers.get_provider('shibboleth')
1184
        self.assertEqual(provider.get_template('login'),
1185
                         'im/auth/shibboleth_login.html')
1186
        provider = auth_providers.get_provider('google')
1187
        self.assertEqual(provider.get_template('login'),
1188
                         'im/auth/generic_login.html')
1189

    
1190

    
1191
class TestActivationBackend(TestCase):
1192

    
1193
    def setUp(self):
1194
        # dummy call to pass through logging middleware
1195
        self.client.get(ui_url(''))
1196

    
1197
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1198
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1199
    def test_policies(self):
1200
        backend = activation_backends.get_backend()
1201

    
1202
        # email matches RE_USER_EMAIL_PATTERNS
1203
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1204
                               is_active=False, email_verified=False)
1205
        backend.handle_verification(user1, user1.verification_code)
1206
        self.assertEqual(user1.accepted_policy, 'email')
1207

    
1208
        # manually moderated
1209
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1210
                               is_active=False, email_verified=False)
1211

    
1212
        backend.handle_verification(user2, user2.verification_code)
1213
        self.assertEqual(user2.moderated, False)
1214
        backend.handle_moderation(user2)
1215
        self.assertEqual(user2.moderated, True)
1216
        self.assertEqual(user2.accepted_policy, 'manual')
1217

    
1218
        # autoaccept due to provider automoderate policy
1219
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1220
                               is_active=False, email_verified=False)
1221
        user3.auth_providers.all().delete()
1222
        user3.add_auth_provider('shibboleth', identifier='shib123')
1223
        backend.handle_verification(user3, user3.verification_code)
1224
        self.assertEqual(user3.moderated, True)
1225
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1226

    
1227
    @im_settings(MODERATION_ENABLED=False,
1228
                 MANAGERS=(('Manager',
1229
                            'manager@synnefo.org'),),
1230
                 HELPDESK=(('Helpdesk',
1231
                            'helpdesk@synnefo.org'),),
1232
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1233
    def test_without_moderation(self):
1234
        backend = activation_backends.get_backend()
1235
        form = backend.get_signup_form('local')
1236
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1237

    
1238
        user_data = {
1239
            'email': 'kpap@synnefo.org',
1240
            'first_name': 'Kostas Papas',
1241
            'password1': '123',
1242
            'password2': '123'
1243
        }
1244
        form = backend.get_signup_form('local', user_data)
1245
        user = form.save(commit=False)
1246
        form.store_user(user)
1247
        self.assertEqual(user.is_active, False)
1248
        self.assertEqual(user.email_verified, False)
1249

    
1250
        # step one, registration
1251
        result = backend.handle_registration(user)
1252
        user = AstakosUser.objects.get()
1253
        self.assertEqual(user.is_active, False)
1254
        self.assertEqual(user.email_verified, False)
1255
        self.assertTrue(user.verification_code)
1256
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1257
        backend.send_result_notifications(result, user)
1258
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1259
        self.assertEqual(len(mail.outbox), 1)
1260

    
1261
        # step two, verify email (automatically
1262
        # moderates/accepts user, since moderation is disabled)
1263
        user = AstakosUser.objects.get()
1264
        valid_code = user.verification_code
1265

    
1266
        # test invalid code
1267
        result = backend.handle_verification(user, valid_code)
1268
        backend.send_result_notifications(result, user)
1269
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1270
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1271
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1272
        # verification + activated + greeting = 3
1273
        self.assertEqual(len(mail.outbox), 3)
1274
        user = AstakosUser.objects.get()
1275
        self.assertEqual(user.is_active, True)
1276
        self.assertEqual(user.moderated, True)
1277
        self.assertTrue(user.moderated_at)
1278
        self.assertEqual(user.email_verified, True)
1279
        self.assertTrue(user.activation_sent)
1280

    
1281
    @im_settings(MODERATION_ENABLED=True,
1282
                 MANAGERS=(('Manager',
1283
                            'manager@synnefo.org'),),
1284
                 HELPDESK=(('Helpdesk',
1285
                            'helpdesk@synnefo.org'),),
1286
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1287
    def test_with_moderation(self):
1288

    
1289
        backend = activation_backends.get_backend()
1290
        form = backend.get_signup_form('local')
1291
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1292

    
1293
        user_data = {
1294
            'email': 'kpap@synnefo.org',
1295
            'first_name': 'Kostas Papas',
1296
            'password1': '123',
1297
            'password2': '123'
1298
        }
1299
        form = backend.get_signup_form(provider='local',
1300
                                       initial_data=user_data)
1301
        user = form.save(commit=False)
1302
        form.store_user(user)
1303
        self.assertEqual(user.is_active, False)
1304
        self.assertEqual(user.email_verified, False)
1305

    
1306
        # step one, registration
1307
        result = backend.handle_registration(user)
1308
        user = AstakosUser.objects.get()
1309
        self.assertEqual(user.is_active, False)
1310
        self.assertEqual(user.email_verified, False)
1311
        self.assertTrue(user.verification_code)
1312
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1313
        backend.send_result_notifications(result, user)
1314
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1315
        self.assertEqual(len(mail.outbox), 1)
1316

    
1317
        # step two, verifying email
1318
        user = AstakosUser.objects.get()
1319
        valid_code = user.verification_code
1320
        invalid_code = user.verification_code + 'invalid'
1321

    
1322
        # test invalid code
1323
        result = backend.handle_verification(user, invalid_code)
1324
        self.assertEqual(result.status, backend.Result.ERROR)
1325
        backend.send_result_notifications(result, user)
1326
        user = AstakosUser.objects.get()
1327
        self.assertEqual(user.is_active, False)
1328
        self.assertEqual(user.moderated, False)
1329
        self.assertEqual(user.moderated_at, None)
1330
        self.assertEqual(user.email_verified, False)
1331
        self.assertTrue(user.activation_sent)
1332

    
1333
        # test valid code
1334
        user = AstakosUser.objects.get()
1335
        result = backend.handle_verification(user, valid_code)
1336
        backend.send_result_notifications(result, user)
1337
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1338
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1339
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1340
        self.assertEqual(len(mail.outbox), 2)
1341
        user = AstakosUser.objects.get()
1342
        self.assertEqual(user.moderated, False)
1343
        self.assertEqual(user.moderated_at, None)
1344
        self.assertEqual(user.email_verified, True)
1345
        self.assertTrue(user.activation_sent)
1346

    
1347
        # test code reuse
1348
        result = backend.handle_verification(user, valid_code)
1349
        self.assertEqual(result.status, backend.Result.ERROR)
1350
        user = AstakosUser.objects.get()
1351
        self.assertEqual(user.is_active, False)
1352
        self.assertEqual(user.moderated, False)
1353
        self.assertEqual(user.moderated_at, None)
1354
        self.assertEqual(user.email_verified, True)
1355
        self.assertTrue(user.activation_sent)
1356

    
1357
        # valid code on verified user
1358
        user = AstakosUser.objects.get()
1359
        valid_code = user.verification_code
1360
        result = backend.handle_verification(user, valid_code)
1361
        self.assertEqual(result.status, backend.Result.ERROR)
1362

    
1363
        # step three, moderation user
1364
        user = AstakosUser.objects.get()
1365
        result = backend.handle_moderation(user)
1366
        backend.send_result_notifications(result, user)
1367

    
1368
        user = AstakosUser.objects.get()
1369
        self.assertEqual(user.is_active, True)
1370
        self.assertEqual(user.moderated, True)
1371
        self.assertTrue(user.moderated_at)
1372
        self.assertEqual(user.email_verified, True)
1373
        self.assertTrue(user.activation_sent)
1374

    
1375

    
1376
class TestWebloginRedirect(TestCase):
1377

    
1378
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1379
    def test_restricts_domains(self):
1380
        get_local_user('user1@synnefo.org')
1381

    
1382
        # next url construct helpers
1383
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1384
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1385
            urllib.quote_plus(nxt)
1386

    
1387
        # common cases
1388
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1389
        invalid_scheme = weblogin("customscheme://localhost")
1390
        invalid_scheme_with_valid_domain = \
1391
            weblogin("http://www.invaliddomain.com")
1392
        valid_scheme = weblogin("pithos://localhost/")
1393
        # to be used in assertRedirects
1394
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1395

    
1396
        # not authenticated, redirects to login which contains next param with
1397
        # additional nested quoted next params
1398
        r = self.client.get(valid_scheme, follow=True)
1399
        login_redirect = reverse('login') + '?next=' + \
1400
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1401
        self.assertRedirects(r, login_redirect)
1402

    
1403
        # authenticate client
1404
        self.client.login(username="user1@synnefo.org", password="password")
1405

    
1406
        # valid scheme
1407
        r = self.client.get(valid_scheme, follow=True)
1408
        url = r.redirect_chain[1][0]
1409
        # scheme preserved
1410
        self.assertTrue(url.startswith('pithos://localhost/'))
1411
        # redirect contains token param
1412
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1413
                                   scheme='https').query
1414
        params = urlparse.parse_qs(params)
1415
        self.assertEqual(params['token'][0],
1416
                         AstakosUser.objects.get().auth_token)
1417
        # does not contain uuid
1418
        # reverted for 0.14.2 to support old pithos desktop clients
1419
        #self.assertFalse('uuid' in params)
1420

    
1421
        # invalid cases
1422
        r = self.client.get(invalid_scheme, follow=True)
1423
        self.assertEqual(r.status_code, 403)
1424

    
1425
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1426
        self.assertEqual(r.status_code, 403)
1427

    
1428
        r = self.client.get(invalid_domain, follow=True)
1429
        self.assertEqual(r.status_code, 403)