Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ bd2c6bc5

History | View | Annotate | Download (65.1 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True,
81
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
82
        token = PendingThirdPartyUser.objects.get().token
83
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
84
        self.assertEqual(r.status_code, 200)
85

    
86
        # a new pending user created
87
        pending_user = PendingThirdPartyUser.objects.get(
88
            third_party_identifier="kpapeppn")
89
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
90
        # keep the token for future use
91
        token = pending_user.token
92
        # from now on no shibboleth headers are sent to the server
93
        client.reset_tokens()
94

    
95
        # this is the old way, it should fail, to avoid pending user take over
96
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
97
        self.assertEqual(r.status_code, 404)
98

    
99
        # this is the signup unique url associated with the pending user
100
        # created
101
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
102
        identifier = pending_user.third_party_identifier
103
        post_data = {'third_party_identifier': identifier,
104
                     'first_name': 'Kostas',
105
                     'third_party_token': token,
106
                     'last_name': 'Mitroglou',
107
                     'provider': 'shibboleth'}
108

    
109
        signup_url = reverse('signup')
110

    
111
        # invlid email
112
        post_data['email'] = 'kpap'
113
        r = client.post(signup_url, post_data)
114
        self.assertContains(r, token)
115

    
116
        # existing email
117
        existing_user = get_local_user('test@test.com')
118
        post_data['email'] = 'test@test.com'
119
        r = client.post(signup_url, post_data)
120
        self.assertContains(r, messages.EMAIL_USED)
121
        existing_user.delete()
122

    
123
        # and finally a valid signup
124
        post_data['email'] = 'kpap@synnefo.org'
125
        r = client.post(signup_url, post_data, follow=True)
126
        self.assertContains(r, messages.VERIFICATION_SENT)
127

    
128
        # entires commited as expected
129
        self.assertEqual(AstakosUser.objects.count(), 1)
130
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
131
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
132

    
133
        user = AstakosUser.objects.get()
134
        provider = user.get_auth_provider("shibboleth")
135
        headers = provider.provider_details['info']['headers']
136
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
137

    
138
        # provider info stored
139
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
140
        self.assertEqual(provider.affiliation, 'Test Affiliation')
141
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
142
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
143
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
144
        self.assertTrue('headers' in provider.info)
145

    
146
        # login (not activated yet)
147
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
148
                          cn="Kostas Papadimitriou")
149
        r = client.get(ui_url("login/shibboleth?"), follow=True)
150
        self.assertContains(r, 'is pending moderation')
151

    
152
        # admin activates the user
153
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
154
        backend = activation_backends.get_backend()
155
        activation_result = backend.verify_user(u, u.verification_code)
156
        activation_result = backend.accept_user(u)
157
        self.assertFalse(activation_result.is_error())
158
        backend.send_result_notifications(activation_result, u)
159
        self.assertEqual(u.is_active, True)
160

    
161
        # we see our profile
162
        r = client.get(ui_url("login/shibboleth?"), follow=True)
163
        self.assertRedirects(r, ui_url('landing'))
164
        self.assertEqual(r.status_code, 200)
165

    
166
    def test_existing(self):
167
        """
168
        Test adding of third party login to an existing account
169
        """
170

    
171
        # this is our existing user
172
        existing_user = get_local_user('kpap@synnefo.org')
173
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
174
        existing_inactive.is_active = False
175
        existing_inactive.save()
176

    
177
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
178
        existing_unverified.is_active = False
179
        existing_unverified.activation_sent = None
180
        existing_unverified.email_verified = False
181
        existing_unverified.is_verified = False
182
        existing_unverified.save()
183

    
184
        client = ShibbolethClient()
185
        # shibboleth logged us in, notice that we use different email
186
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
187
                          cn="Kostas Papadimitriou", )
188
        r = client.get(ui_url("login/shibboleth?"), follow=True)
189

    
190
        # a new pending user created
191
        pending_user = PendingThirdPartyUser.objects.get()
192
        token = pending_user.token
193
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
194
        pending_key = pending_user.token
195
        client.reset_tokens()
196
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
197

    
198
        form = r.context['login_form']
199
        signupdata = copy.copy(form.initial)
200
        signupdata['email'] = 'kpap@synnefo.org'
201
        signupdata['third_party_token'] = token
202
        signupdata['provider'] = 'shibboleth'
203
        signupdata.pop('id', None)
204

    
205
        # the email exists to another user
206
        r = client.post(ui_url("signup"), signupdata)
207
        self.assertContains(r, "There is already an account with this email "
208
                               "address")
209
        # change the case, still cannot create
210
        signupdata['email'] = 'KPAP@synnefo.org'
211
        r = client.post(ui_url("signup"), signupdata)
212
        self.assertContains(r, "There is already an account with this email "
213
                               "address")
214
        # inactive user
215
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
216
        r = client.post(ui_url("signup"), signupdata)
217
        self.assertContains(r, "There is already an account with this email "
218
                               "address")
219

    
220
        # unverified user, this should pass, old entry will be deleted
221
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
222
        r = client.post(ui_url("signup"), signupdata)
223

    
224
        post_data = {'password': 'password',
225
                     'username': 'kpap@synnefo.org'}
226
        r = client.post(ui_url('local'), post_data, follow=True)
227
        self.assertTrue(r.context['request'].user.is_authenticated())
228
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
229
                          cn="Kostas Papadimitriou", )
230
        r = client.get(ui_url("login/shibboleth?"), follow=True)
231
        self.assertContains(r, "enabled for this account")
232
        client.reset_tokens()
233

    
234
        user = existing_user
235
        self.assertTrue(user.has_auth_provider('shibboleth'))
236
        self.assertTrue(user.has_auth_provider('local',
237
                                               auth_backend='astakos'))
238
        client.logout()
239

    
240
        # look Ma, i can login with both my shibboleth and local account
241
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
242
                          cn="Kostas Papadimitriou")
243
        r = client.get(ui_url("login/shibboleth?"), follow=True)
244
        self.assertTrue(r.context['request'].user.is_authenticated())
245
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
246
        self.assertRedirects(r, ui_url('landing'))
247
        self.assertEqual(r.status_code, 200)
248

    
249
        user = r.context['request'].user
250
        client.logout()
251
        client.reset_tokens()
252

    
253
        # logged out
254
        r = client.get(ui_url("profile"), follow=True)
255
        self.assertFalse(r.context['request'].user.is_authenticated())
256

    
257
        # login with local account also works
258
        post_data = {'password': 'password',
259
                     'username': 'kpap@synnefo.org'}
260
        r = self.client.post(ui_url('local'), post_data, follow=True)
261
        self.assertTrue(r.context['request'].user.is_authenticated())
262
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
263
        self.assertRedirects(r, ui_url('landing'))
264
        self.assertEqual(r.status_code, 200)
265

    
266
        # cannot add the same eppn
267
        client.set_tokens(mail="secondary@shibboleth.gr",
268
                          remote_user="kpapeppn",
269
                          cn="Kostas Papadimitriou", )
270
        r = client.get(ui_url("login/shibboleth?"), follow=True)
271
        self.assertRedirects(r, ui_url('landing'))
272
        self.assertTrue(r.status_code, 200)
273
        self.assertEquals(existing_user.auth_providers.count(), 2)
274

    
275
        # only one allowed by default
276
        client.set_tokens(mail="secondary@shibboleth.gr",
277
                          remote_user="kpapeppn2",
278
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
279
        prov = auth_providers.get_provider('shibboleth')
280
        r = client.get(ui_url("login/shibboleth?"), follow=True)
281
        self.assertContains(r, "Failed to add")
282
        self.assertRedirects(r, ui_url('profile'))
283
        self.assertTrue(r.status_code, 200)
284
        self.assertEquals(existing_user.auth_providers.count(), 2)
285
        client.logout()
286
        client.reset_tokens()
287

    
288
        # cannot login with another eppn
289
        client.set_tokens(mail="kpap@synnefo.org",
290
                          remote_user="kpapeppninvalid",
291
                          cn="Kostas Papadimitriou")
292
        r = client.get(ui_url("login/shibboleth?"), follow=True)
293
        self.assertFalse(r.context['request'].user.is_authenticated())
294

    
295
        # cannot
296

    
297
        # lets remove local password
298
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
299
                                       email="kpap@synnefo.org")
300
        remove_local_url = user.get_auth_provider('local').get_remove_url
301
        remove_shibbo_url = user.get_auth_provider('shibboleth',
302
                                                   'kpapeppn').get_remove_url
303
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
304
                          cn="Kostas Papadimtriou")
305
        r = client.get(ui_url("login/shibboleth?"), follow=True)
306
        client.reset_tokens()
307

    
308
        # only POST is allowed (for CSRF protection)
309
        r = client.get(remove_local_url, follow=True)
310
        self.assertEqual(r.status_code, 405)
311

    
312
        r = client.post(remove_local_url, follow=True)
313
        # 2 providers left
314
        self.assertEqual(user.auth_providers.count(), 1)
315
        # cannot remove last provider
316
        r = client.post(remove_shibbo_url)
317
        self.assertEqual(r.status_code, 403)
318
        self.client.logout()
319

    
320
        # cannot login using local credentials (notice we use another client)
321
        post_data = {'password': 'password',
322
                     'username': 'kpap@synnefo.org'}
323
        r = self.client.post(ui_url('local'), post_data, follow=True)
324
        self.assertFalse(r.context['request'].user.is_authenticated())
325

    
326
        # we can reenable the local provider by setting a password
327
        r = client.get(ui_url("password_change"), follow=True)
328
        r = client.post(ui_url("password_change"), {'new_password1': '111',
329
                                                    'new_password2': '111'},
330
                        follow=True)
331
        user = r.context['request'].user
332
        self.assertTrue(user.has_auth_provider('local'))
333
        self.assertTrue(user.has_auth_provider('shibboleth'))
334
        self.assertTrue(user.check_password('111'))
335
        self.assertTrue(user.has_usable_password())
336

    
337
        # change password via profile form
338
        r = client.post(ui_url("profile"), {
339
            'old_password': '111',
340
            'new_password': '',
341
            'new_password2': '',
342
            'change_password': 'on',
343
        }, follow=False)
344
        self.assertEqual(r.status_code, 200)
345
        self.assertFalse(r.context['profile_form'].is_valid())
346

    
347
        self.client.logout()
348

    
349
        # now we can login
350
        post_data = {'password': '111',
351
                     'username': 'kpap@synnefo.org'}
352
        r = self.client.post(ui_url('local'), post_data, follow=True)
353
        self.assertTrue(r.context['request'].user.is_authenticated())
354

    
355
        client.reset_tokens()
356

    
357
        # we cannot take over another shibboleth identifier
358
        user2 = get_local_user('another@synnefo.org')
359
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
360
        # login
361
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
362
                          cn="Kostas Papadimitriou")
363
        r = client.get(ui_url("login/shibboleth?"), follow=True)
364
        # try to assign existing shibboleth identifier of another user
365
        client.set_tokens(mail="kpap_second@shibboleth.gr",
366
                          remote_user="existingeppn",
367
                          cn="Kostas Papadimitriou")
368
        r = client.get(ui_url("login/shibboleth?"), follow=True)
369
        self.assertContains(r, "is already in use")
370

    
371

    
372
class TestLocal(TestCase):
373

    
374
    def setUp(self):
375
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
376
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
377
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
378
        settings.ASTAKOS_MODERATION_ENABLED = True
379

    
380
    def tearDown(self):
381
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
382
        AstakosUser.objects.all().delete()
383

    
384
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
385
    def test_login_ratelimit(self):
386
        from django.core.cache import cache
387
        cache.clear()
388
        [cache.delete(key) for key in cache._cache.keys()]
389

    
390
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
391
        r = self.client.post(ui_url('local'), credentials, follow=True)
392
        fields = r.context['login_form'].fields.keyOrder
393
        self.assertFalse('recaptcha_challenge_field' in fields)
394
        r = self.client.post(ui_url('local'), credentials, follow=True)
395
        fields = r.context['login_form'].fields.keyOrder
396
        self.assertFalse('recaptcha_challenge_field' in fields)
397
        r = self.client.post(ui_url('local'), credentials, follow=True)
398
        fields = r.context['login_form'].fields.keyOrder
399
        self.assertTrue('recaptcha_challenge_field' in fields)
400

    
401
    def test_no_moderation(self):
402
        # disable moderation
403
        astakos_settings.MODERATION_ENABLED = False
404

    
405
        # create a new user
406
        r = self.client.get(ui_url("signup"))
407
        self.assertEqual(r.status_code, 200)
408
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
409
                'password2': 'password', 'first_name': 'Kostas',
410
                'last_name': 'Mitroglou', 'provider': 'local'}
411
        r = self.client.post(ui_url("signup"), data)
412

    
413
        # user created
414
        self.assertEqual(AstakosUser.objects.count(), 1)
415
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
416
                                       email="kpap@synnefo.org")
417
        self.assertEqual(user.username, 'kpap@synnefo.org')
418
        self.assertEqual(user.has_auth_provider('local'), True)
419
        self.assertFalse(user.is_active)
420

    
421
        # user (but not admin) gets notified
422
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
423
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
424
        astakos_settings.MODERATION_ENABLED = True
425

    
426
    def test_email_case(self):
427
        data = {
428
            'email': 'kPap@synnefo.org',
429
            'password1': '1234',
430
            'password2': '1234'
431
        }
432

    
433
        form = forms.LocalUserCreationForm(data)
434
        self.assertTrue(form.is_valid())
435
        user = form.create_user()
436

    
437
        u = AstakosUser.objects.get()
438
        self.assertEqual(u.email, 'kPap@synnefo.org')
439
        self.assertEqual(u.username, 'kpap@synnefo.org')
440
        u.is_active = True
441
        u.email_verified = True
442
        u.save()
443

    
444
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
445
        login = forms.LoginForm(data=data)
446
        self.assertTrue(login.is_valid())
447

    
448
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
449
        login = forms.LoginForm(data=data)
450
        self.assertTrue(login.is_valid())
451

    
452
        data = {
453
            'email': 'kpap@synnefo.org',
454
            'password1': '1234',
455
            'password2': '1234'
456
        }
457
        form = forms.LocalUserCreationForm(data)
458
        self.assertFalse(form.is_valid())
459

    
460
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
461
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
462
    def test_local_provider(self):
463
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
464

    
465
        # create a user
466
        r = self.client.get(ui_url("signup"))
467
        self.assertEqual(r.status_code, 200)
468
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
469
                'password2': 'password', 'first_name': 'Kostas',
470
                'last_name': 'Mitroglou', 'provider': 'local'}
471
        r = self.client.post(ui_url("signup"), data)
472

    
473
        # user created
474
        self.assertEqual(AstakosUser.objects.count(), 1)
475
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
476
                                       email="kpap@synnefo.org")
477
        self.assertEqual(user.username, 'kpap@synnefo.org')
478
        self.assertEqual(user.has_auth_provider('local'), True)
479
        self.assertFalse(user.is_active)  # not activated
480
        self.assertFalse(user.email_verified)  # not verified
481
        self.assertTrue(user.activation_sent)  # activation automatically sent
482
        self.assertFalse(user.moderated)
483
        self.assertFalse(user.email_verified)
484

    
485
        # admin gets notified and activates the user from the command line
486
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
487
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
488
                                               'password': 'password'},
489
                             follow=True)
490
        self.assertContains(r, messages.VERIFICATION_SENT)
491
        backend = activation_backends.get_backend()
492

    
493
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
494
        backend.send_user_verification_email(user)
495

    
496
        # user activation fields updated and user gets notified via email
497
        user = AstakosUser.objects.get(pk=user.pk)
498
        self.assertTrue(user.activation_sent)
499
        self.assertFalse(user.email_verified)
500
        self.assertFalse(user.is_active)
501
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
502

    
503
        # user forgot she got registered and tries to submit registration
504
        # form. Notice the upper case in email
505
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
506
                'password2': 'password', 'first_name': 'Kostas',
507
                'last_name': 'Mitroglou', 'provider': 'local'}
508
        r = self.client.post(ui_url("signup"), data, follow=True)
509
        self.assertRedirects(r, reverse('login'))
510
        self.assertContains(r, messages.VERIFICATION_SENT)
511

    
512
        user = AstakosUser.objects.get()
513
        # previous user replaced
514
        self.assertTrue(user.activation_sent)
515
        self.assertFalse(user.email_verified)
516
        self.assertFalse(user.is_active)
517
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
518

    
519
        # hmmm, email exists; lets request a password change
520
        r = self.client.get(ui_url('local/password_reset'))
521
        self.assertEqual(r.status_code, 200)
522
        data = {'email': 'kpap@synnefo.org'}
523
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
524
        # she can't because account is not active yet
525
        self.assertContains(r, 'pending activation')
526

    
527
        # moderation is enabled and an activation email has already been sent
528
        # so user can trigger resend of the activation email
529
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
530
                            follow=True)
531
        self.assertContains(r, 'has been sent to your email address.')
532
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
533

    
534
        # also she cannot login
535
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
536
        r = self.client.post(ui_url('local'), data, follow=True)
537
        self.assertContains(r, 'Resend activation')
538
        self.assertFalse(r.context['request'].user.is_authenticated())
539
        self.assertFalse('_pithos2_a' in self.client.cookies)
540

    
541
        # user sees the message and resends activation
542
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
543
                            follow=True)
544
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
545

    
546
        # logged in user cannot activate another account
547
        tmp_user = get_local_user("test_existing_user@synnefo.org")
548
        tmp_client = Client()
549
        tmp_client.login(username="test_existing_user@synnefo.org",
550
                         password="password")
551
        r = tmp_client.get(user.get_activation_url(), follow=True)
552
        self.assertContains(r, messages.LOGGED_IN_WARNING)
553

    
554
        # empty activation code is not allowed
555
        r = self.client.get(user.get_activation_url().split("?")[0],
556
                            follow=True)
557
        self.assertEqual(r.status_code, 403)
558

    
559
        r = self.client.get(user.get_activation_url(), follow=True)
560
        # previous code got invalidated
561
        self.assertEqual(r.status_code, 404)
562

    
563
        user = AstakosUser.objects.get(pk=user.pk)
564
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
565
        r = self.client.get(user.get_activation_url(), follow=True)
566
        self.assertRedirects(r, reverse('login'))
567
        # user sees that account is pending approval from admins
568
        self.assertContains(r, messages.NOTIFICATION_SENT)
569
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
570

    
571
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
572
        result = backend.handle_moderation(user)
573
        backend.send_result_notifications(result, user)
574
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
575
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
576

    
577
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
578
        r = self.client.get(ui_url('profile'), follow=True)
579
        self.assertFalse(r.context['request'].user.is_authenticated())
580
        self.assertFalse('_pithos2_a' in self.client.cookies)
581
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
582

    
583
        user = AstakosUser.objects.get(pk=user.pk)
584
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
585
                                               'password': 'password'},
586
                             follow=True)
587
        # user activated and logged in, token cookie set
588
        self.assertTrue(r.context['request'].user.is_authenticated())
589
        self.assertTrue('_pithos2_a' in self.client.cookies)
590
        cookies = self.client.cookies
591
        self.assertTrue(quote(user.auth_token) in
592
                        cookies.get('_pithos2_a').value)
593
        r = self.client.get(ui_url('logout'), follow=True)
594
        r = self.client.get(ui_url(''), follow=True)
595
        self.assertRedirects(r, ui_url('login'))
596
        # user logged out, token cookie removed
597
        self.assertFalse(r.context['request'].user.is_authenticated())
598
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
599

    
600
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
601
        del self.client.cookies['_pithos2_a']
602

    
603
        # user can login
604
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
605
                                               'password': 'password'},
606
                             follow=True)
607
        self.assertTrue(r.context['request'].user.is_authenticated())
608
        self.assertTrue('_pithos2_a' in self.client.cookies)
609
        cookies = self.client.cookies
610
        self.assertTrue(quote(user.auth_token) in
611
                        cookies.get('_pithos2_a').value)
612
        self.client.get(ui_url('logout'), follow=True)
613

    
614
        # user forgot password
615
        old_pass = user.password
616
        r = self.client.get(ui_url('local/password_reset'))
617
        self.assertEqual(r.status_code, 200)
618
        r = self.client.post(ui_url('local/password_reset'),
619
                             {'email': 'kpap@synnefo.org'})
620
        self.assertEqual(r.status_code, 302)
621
        # email sent
622
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
623

    
624
        # user visits change password link
625
        user = AstakosUser.objects.get(pk=user.pk)
626
        r = self.client.get(user.get_password_reset_url())
627
        r = self.client.post(user.get_password_reset_url(),
628
                             {'new_password1': 'newpass',
629
                              'new_password2': 'newpass'})
630

    
631
        user = AstakosUser.objects.get(pk=user.pk)
632
        self.assertNotEqual(old_pass, user.password)
633

    
634
        # old pass is not usable
635
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
636
                                               'password': 'password'})
637
        self.assertContains(r, 'Please enter a correct username and password')
638
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
639
                                               'password': 'newpass'},
640
                             follow=True)
641
        self.assertTrue(r.context['request'].user.is_authenticated())
642
        self.client.logout()
643

    
644
        # tests of special local backends
645
        user = AstakosUser.objects.get(pk=user.pk)
646
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
647
        user.save()
648

    
649
        # non astakos local backends do not support password reset
650
        r = self.client.get(ui_url('local/password_reset'))
651
        self.assertEqual(r.status_code, 200)
652
        r = self.client.post(ui_url('local/password_reset'),
653
                             {'email': 'kpap@synnefo.org'})
654
        # she can't because account is not active yet
655
        self.assertContains(r, "Changing password is not")
656

    
657
    def test_fix_superuser(self):
658
        u = User.objects.create(username="dummy", email="email@example.org",
659
                                first_name="Super", last_name="User",
660
                                is_superuser=True)
661
        User.objects.create(username="dummy2", email="email2@example.org",
662
                            first_name="Other", last_name="User")
663
        fixed = auth_functions.fix_superusers()
664
        self.assertEqual(len(fixed), 1)
665
        fuser = fixed[0]
666
        self.assertEqual(fuser.email, fuser.username)
667

    
668

    
669
class UserActionsTests(TestCase):
670

    
671
    def test_email_validation(self):
672
        backend = activation_backends.get_backend()
673
        form = backend.get_signup_form('local')
674
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
675
        user_data = {
676
            'email': 'kpap@synnefo.org',
677
            'first_name': 'Kostas Papas',
678
            'password1': '123',
679
            'password2': '123'
680
        }
681
        form = backend.get_signup_form(provider='local',
682
                                       initial_data=user_data)
683
        self.assertTrue(form.is_valid())
684
        user_data['email'] = 'kpap@synnefo.org.'
685
        form = backend.get_signup_form(provider='local',
686
                                       initial_data=user_data)
687
        self.assertFalse(form.is_valid())
688

    
689
    def test_email_change(self):
690
        # to test existing email validation
691
        get_local_user('existing@synnefo.org')
692

    
693
        # local user
694
        user = get_local_user('kpap@synnefo.org')
695

    
696
        # login as kpap
697
        self.client.login(username='kpap@synnefo.org', password='password')
698
        r = self.client.get(ui_url('profile'), follow=True)
699
        user = r.context['request'].user
700
        self.assertTrue(user.is_authenticated())
701

    
702
        # change email is enabled
703
        r = self.client.get(ui_url('email_change'))
704
        self.assertEqual(r.status_code, 200)
705
        self.assertFalse(user.email_change_is_pending())
706

    
707
        # invalid email format
708
        data = {'new_email_address': 'existing@synnefo.org.'}
709
        r = self.client.post(ui_url('email_change'), data)
710
        form = r.context['form']
711
        self.assertFalse(form.is_valid())
712

    
713
        # request email change to an existing email fails
714
        data = {'new_email_address': 'existing@synnefo.org'}
715
        r = self.client.post(ui_url('email_change'), data)
716

    
717
        self.assertContains(r, messages.EMAIL_USED)
718

    
719
        # proper email change
720
        data = {'new_email_address': 'kpap@gmail.com'}
721
        r = self.client.post(ui_url('email_change'), data, follow=True)
722
        self.assertRedirects(r, ui_url('profile'))
723
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
724
        change1 = EmailChange.objects.get()
725

    
726
        # user sees a warning
727
        r = self.client.get(ui_url('email_change'))
728
        self.assertEqual(r.status_code, 200)
729
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
730
        self.assertTrue(user.email_change_is_pending())
731

    
732
        # link was sent
733
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
734
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
735

    
736
        # proper email change
737
        data = {'new_email_address': 'kpap@yahoo.com'}
738
        r = self.client.post(ui_url('email_change'), data, follow=True)
739
        self.assertRedirects(r, ui_url('profile'))
740
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
741
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
742
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
743
        change2 = EmailChange.objects.get()
744

    
745
        r = self.client.get(change1.get_url())
746
        self.assertEquals(r.status_code, 404)
747
        self.client.logout()
748

    
749
        invalid_client = Client()
750
        r = invalid_client.post(ui_url('local?'),
751
                                {'username': 'existing@synnefo.org',
752
                                 'password': 'password'})
753
        r = invalid_client.get(change2.get_url(), follow=True)
754
        self.assertEquals(r.status_code, 403)
755

    
756
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
757
                             {'username': 'kpap@synnefo.org',
758
                              'password': 'password',
759
                              'next': change2.get_url()},
760
                             follow=True)
761
        self.assertRedirects(r, ui_url('profile'))
762
        user = r.context['request'].user
763
        self.assertEquals(user.email, 'kpap@yahoo.com')
764
        self.assertEquals(user.username, 'kpap@yahoo.com')
765

    
766
        self.client.logout()
767
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
768
                             {'username': 'kpap@synnefo.org',
769
                              'password': 'password',
770
                              'next': change2.get_url()},
771
                             follow=True)
772
        self.assertContains(r, "Please enter a correct username and password")
773
        self.assertEqual(user.emailchanges.count(), 0)
774

    
775
        AstakosUser.objects.all().delete()
776
        Group.objects.all().delete()
777

    
778

    
779
TEST_TARGETED_ID1 = \
780
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
781
TEST_TARGETED_ID2 = \
782
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
783
TEST_TARGETED_ID3 = \
784
    "https://idp.synnefo.org/idp/shibboleth!"
785

    
786

    
787
class TestAuthProviderViews(TestCase):
788

    
789
    def tearDown(self):
790
        AstakosUser.objects.all().delete()
791

    
792
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
793
                 SHIBBOLETH_MIGRATE_EPPN=True)
794
    def migrate_to_remote_id(self):
795
        eppn_user = get_local_user("eppn@synnefo.org")
796
        tid_user = get_local_user("tid@synnefo.org")
797
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
798
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
799

    
800
        get_user = lambda r: r.context['request'].user
801

    
802
        client = ShibbolethClient()
803
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
804
        r = client.get(ui_url('login/shibboleth?'), follow=True)
805
        self.assertTrue(get_user(r).is_authenticated())
806
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
807
                         TEST_TARGETED_ID2)
808

    
809
        client = ShibbolethClient()
810
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
811
        r = client.get(ui_url('login/shibboleth?'), follow=True)
812
        self.assertTrue(get_user(r).is_authenticated())
813
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
814
                         TEST_TARGETED_ID1)
815

    
816
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
817
                         AUTOMODERATE_POLICY=True)
818
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
819
                 HELPDESK=(('support', 'support@synnefo.org'),),
820
                 FORCE_PROFILE_UPDATE=False)
821
    def test_user(self):
822
        Profile = AuthProviderPolicyProfile
823
        Pending = PendingThirdPartyUser
824
        User = AstakosUser
825

    
826
        auth_functions.make_user("newuser@synnefo.org")
827
        get_local_user("olduser@synnefo.org")
828
        cl_olduser = ShibbolethClient()
829
        get_local_user("olduser2@synnefo.org")
830
        ShibbolethClient()
831
        cl_newuser = ShibbolethClient()
832
        cl_newuser2 = Client()
833

    
834
        academic_group, created = Group.objects.get_or_create(
835
            name='academic-login')
836
        academic_users = academic_group.user_set
837
        assert created
838
        policy_only_academic = Profile.objects.add_policy('academic_strict',
839
                                                          'shibboleth',
840
                                                          academic_group,
841
                                                          exclusive=True,
842
                                                          login=False,
843
                                                          add=False)
844

    
845
        # new academic user
846
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
847
        cl_newuser.set_tokens(remote_user="newusereppn",
848
                              mail="newuser@synnefo.org", surname="Lastname")
849
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
850
        initial = r.context['signup_form'].initial
851
        pending = Pending.objects.get()
852
        self.assertEqual(initial.get('last_name'), 'Lastname')
853
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
854
        identifier = pending.third_party_identifier
855
        signup_data = {'third_party_identifier': identifier,
856
                       'first_name': 'Academic',
857
                       'third_party_token': pending.token,
858
                       'last_name': 'New User',
859
                       'provider': 'shibboleth'}
860
        r = cl_newuser.post(ui_url('signup'), signup_data)
861
        self.assertContains(r, "This field is required", )
862
        signup_data['email'] = 'olduser@synnefo.org'
863
        r = cl_newuser.post(ui_url('signup'), signup_data)
864
        self.assertContains(r, "already an account with this email", )
865
        signup_data['email'] = 'newuser@synnefo.org'
866
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
867
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
868
        self.assertEqual(r.status_code, 404)
869
        newuser = User.objects.get(email="newuser@synnefo.org")
870
        activation_link = newuser.get_activation_url()
871
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
872

    
873
        # new non-academic user
874
        signup_data = {'first_name': 'Non Academic',
875
                       'last_name': 'New User',
876
                       'provider': 'local',
877
                       'password1': 'password',
878
                       'password2': 'password'}
879
        signup_data['email'] = 'olduser@synnefo.org'
880
        r = cl_newuser2.post(ui_url('signup'), signup_data)
881
        self.assertContains(r, 'There is already an account with this '
882
                               'email address')
883
        signup_data['email'] = 'newuser@synnefo.org'
884
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
885
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
886
        r = self.client.get(activation_link, follow=True)
887
        self.assertEqual(r.status_code, 404)
888
        newuser = User.objects.get(email="newuser@synnefo.org")
889
        self.assertTrue(newuser.activation_sent)
890

    
891
        # activation sent, user didn't open verification url so additional
892
        # registrations invalidate the previous signups.
893
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
894
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
895
        pending = Pending.objects.get()
896
        identifier = pending.third_party_identifier
897
        signup_data = {'third_party_identifier': identifier,
898
                       u'first_name': 'Academic γιούνικοουντ',
899
                       'third_party_token': pending.token,
900
                       'last_name': 'New User',
901
                       'provider': 'shibboleth'}
902
        signup_data['email'] = 'newuser@synnefo.org'
903
        r = cl_newuser.post(ui_url('signup'), signup_data)
904
        self.assertEqual(r.status_code, 302)
905
        newuser = User.objects.get(email="newuser@synnefo.org")
906
        self.assertTrue(newuser.activation_sent)
907
        activation_link = newuser.get_activation_url()
908
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
909
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
910
        self.assertRedirects(r, ui_url('landing'))
911
        helpdesk_email = astakos_settings.HELPDESK[0][1]
912
        self.assertEqual(len(get_mailbox(helpdesk_email)), 1)
913
        self.assertTrue(u'AstakosUser: Academic γιούνικοουντ' in \
914
                            get_mailbox(helpdesk_email)[0].body)
915
        newuser = User.objects.get(email="newuser@synnefo.org")
916
        self.assertEqual(newuser.is_active, True)
917
        self.assertEqual(newuser.email_verified, True)
918
        cl_newuser.logout()
919

    
920
        # cannot reactivate if suspended
921
        newuser.is_active = False
922
        newuser.save()
923
        r = cl_newuser.get(newuser.get_activation_url())
924
        newuser = User.objects.get(email="newuser@synnefo.org")
925
        self.assertFalse(newuser.is_active)
926

    
927
        # release suspension
928
        newuser.is_active = True
929
        newuser.save()
930

    
931
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
932
        local = auth.get_provider('local', newuser)
933
        self.assertEqual(local.get_add_policy, False)
934
        self.assertEqual(local.get_login_policy, False)
935
        r = cl_newuser.get(local.get_add_url, follow=True)
936
        self.assertRedirects(r, ui_url('profile'))
937
        self.assertContains(r, 'disabled for your')
938

    
939
        cl_olduser.login(username='olduser@synnefo.org', password="password")
940
        r = cl_olduser.get(ui_url('profile'), follow=True)
941
        self.assertEqual(r.status_code, 200)
942
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
943
        self.assertContains(r, 'Your request is missing a unique token')
944
        cl_olduser.set_tokens(remote_user="newusereppn")
945
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
946
        self.assertContains(r, 'already in use')
947
        cl_olduser.set_tokens(remote_user="oldusereppn")
948
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
949
        self.assertContains(r, 'Academic login enabled for this account')
950

    
951
        user = User.objects.get(email="olduser@synnefo.org")
952
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
953
        local_provider = user.get_auth_provider('local')
954
        self.assertEqual(shib_provider.get_remove_policy, True)
955
        self.assertEqual(local_provider.get_remove_policy, True)
956

    
957
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
958
                                                          'shibboleth',
959
                                                          academic_group,
960
                                                          remove=False)
961
        user.groups.add(academic_group)
962
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
963
        local_provider = user.get_auth_provider('local')
964
        self.assertEqual(shib_provider.get_remove_policy, False)
965
        self.assertEqual(local_provider.get_remove_policy, True)
966
        self.assertEqual(local_provider.get_login_policy, False)
967

    
968
        cl_olduser.logout()
969
        login_data = {'username': 'olduser@synnefo.org',
970
                      'password': 'password'}
971
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
972
        self.assertContains(r, "login/shibboleth'>Academic login")
973
        Group.objects.all().delete()
974

    
975

    
976
class TestAuthProvidersAPI(TestCase):
977
    """
978
    Test auth_providers module API
979
    """
980

    
981
    def tearDown(self):
982
        Group.objects.all().delete()
983

    
984
    @im_settings(IM_MODULES=['local', 'shibboleth'])
985
    def test_create(self):
986
        user = auth_functions.make_user(email="kpap@synnefo.org")
987
        user2 = auth_functions.make_user(email="kpap2@synnefo.org")
988

    
989
        module = 'shibboleth'
990
        identifier = 'SHIB_UUID'
991
        provider_params = {
992
            'affiliation': 'UNIVERSITY',
993
            'info': {'age': 27}
994
        }
995
        provider = auth.get_provider(module, user2, identifier,
996
                                     **provider_params)
997
        provider.add_to_user()
998
        provider = auth.get_provider(module, user, identifier,
999
                                     **provider_params)
1000
        provider.add_to_user()
1001
        user.email_verified = True
1002
        user.save()
1003
        self.assertRaises(Exception, provider.add_to_user)
1004
        provider = user.get_auth_provider(module, identifier)
1005
        self.assertEqual(user.get_auth_provider(
1006
            module, identifier)._instance.info.get('age'), 27)
1007

    
1008
        module = 'local'
1009
        identifier = None
1010
        provider_params = {'auth_backend': 'ldap', 'info':
1011
                          {'office': 'A1'}}
1012
        provider = auth.get_provider(module, user, identifier,
1013
                                     **provider_params)
1014
        provider.add_to_user()
1015
        self.assertFalse(provider.get_add_policy)
1016
        self.assertRaises(Exception, provider.add_to_user)
1017

    
1018
        shib = user.get_auth_provider('shibboleth',
1019
                                      'SHIB_UUID')
1020
        self.assertTrue(shib.get_remove_policy)
1021

    
1022
        local = user.get_auth_provider('local')
1023
        self.assertTrue(local.get_remove_policy)
1024

    
1025
        local.remove_from_user()
1026
        self.assertFalse(shib.get_remove_policy)
1027
        self.assertRaises(Exception, shib.remove_from_user)
1028

    
1029
        provider = user.get_auth_providers()[0]
1030
        self.assertRaises(Exception, provider.add_to_user)
1031

    
1032
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1033
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1034
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1035
                                                 'group2'])
1036
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1037
                        CREATION_GROUPS_POLICY=['localgroup-create',
1038
                                                'group-create'])
1039
    def test_add_groups(self):
1040
        user = auth_functions.make_user("kpap@synnefo.org")
1041
        provider = auth.get_provider('shibboleth', user, 'test123')
1042
        provider.add_to_user()
1043
        user = AstakosUser.objects.get()
1044
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1045
                         sorted([u'group1', u'group2', u'group-create']))
1046

    
1047
        local = auth.get_provider('local', user)
1048
        local.add_to_user()
1049
        provider = user.get_auth_provider('shibboleth')
1050
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1051
        provider.remove_from_user()
1052
        user = AstakosUser.objects.get()
1053
        self.assertEqual(len(user.get_auth_providers()), 1)
1054
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1055
                         sorted([u'group-create', u'localgroup']))
1056

    
1057
        local = user.get_auth_provider('local')
1058
        self.assertRaises(Exception, local.remove_from_user)
1059
        provider = auth.get_provider('shibboleth', user, 'test123')
1060
        provider.add_to_user()
1061
        user = AstakosUser.objects.get()
1062
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1063
                         sorted([u'group-create', u'group1', u'group2',
1064
                                 u'localgroup']))
1065
        Group.objects.all().delete()
1066

    
1067
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1068
    def test_policies(self):
1069
        group_old, created = Group.objects.get_or_create(name='olduser')
1070

    
1071
        astakos_settings.MODERATION_ENABLED = True
1072
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1073
            ['academic-user']
1074
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1075
            ['google-user']
1076

    
1077
        user = auth_functions.make_user("kpap@synnefo.org")
1078
        user.groups.add(group_old)
1079
        user.add_auth_provider('local')
1080

    
1081
        user2 = auth_functions.make_user("kpap2@synnefo.org")
1082
        user2.add_auth_provider('shibboleth', identifier='shibid')
1083

    
1084
        user3 = auth_functions.make_user("kpap3@synnefo.org")
1085
        user3.groups.add(group_old)
1086
        user3.add_auth_provider('local')
1087
        user3.add_auth_provider('shibboleth', identifier='1234')
1088

    
1089
        self.assertTrue(user2.groups.get(name='academic-user'))
1090
        self.assertFalse(user2.groups.filter(name='olduser').count())
1091

    
1092
        local = auth_providers.get_provider('local')
1093
        self.assertTrue(local.get_add_policy)
1094

    
1095
        academic_group = Group.objects.get(name='academic-user')
1096
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1097
                                                     academic_group,
1098
                                                     exclusive=True,
1099
                                                     add=False,
1100
                                                     login=False)
1101
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1102
                                                     academic_group,
1103
                                                     exclusive=True,
1104
                                                     login=False,
1105
                                                     add=False)
1106
        # no duplicate entry gets created
1107
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1108

    
1109
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1110
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1111
                                                     user2,
1112
                                                     remove=False)
1113
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1114

    
1115
        local = auth_providers.get_provider('local', user2)
1116
        google = auth_providers.get_provider('google', user2)
1117
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1118
        self.assertTrue(shibboleth.get_login_policy)
1119
        self.assertFalse(shibboleth.get_remove_policy)
1120
        self.assertFalse(local.get_add_policy)
1121
        self.assertFalse(local.get_add_policy)
1122
        self.assertFalse(google.get_add_policy)
1123

    
1124
        user2.groups.remove(Group.objects.get(name='academic-user'))
1125
        self.assertTrue(local.get_add_policy)
1126
        self.assertTrue(google.get_add_policy)
1127
        user2.groups.add(Group.objects.get(name='academic-user'))
1128

    
1129
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1130
                                                     user2,
1131
                                                     exclusive=True,
1132
                                                     add=True)
1133
        self.assertTrue(local.get_add_policy)
1134
        self.assertTrue(google.get_add_policy)
1135

    
1136
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1137
        self.assertFalse(local.get_automoderate_policy)
1138
        self.assertFalse(google.get_automoderate_policy)
1139
        self.assertTrue(shibboleth.get_automoderate_policy)
1140

    
1141
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1142
                  'GOOGLE_ADD_GROUPS_POLICY']:
1143
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1144

    
1145
    @shibboleth_settings(CREATE_POLICY=True)
1146
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1147
    def test_create_http(self):
1148
        # this should be wrapped inside a transaction
1149
        user = auth_functions.make_user(email="test@test.com")
1150
        provider = auth_providers.get_provider('shibboleth', user,
1151
                                               'test@academia.test')
1152
        provider.add_to_user()
1153
        user.get_auth_provider('shibboleth', 'test@academia.test')
1154
        provider = auth_providers.get_provider('local', user)
1155
        provider.add_to_user()
1156
        user.get_auth_provider('local')
1157

    
1158
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1159
        user = auth_functions.make_user("test2@test.com")
1160
        provider = auth_providers.get_provider('shibboleth', user,
1161
                                               'test@shibboleth.com',
1162
                                               **{'info': {'name':
1163
                                                           'User Test'}})
1164
        self.assertFalse(provider.get_create_policy)
1165
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1166
        self.assertTrue(provider.get_create_policy)
1167
        academic = provider.add_to_user()
1168

    
1169
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1170
    @shibboleth_settings(LIMIT_POLICY=2)
1171
    def test_policies(self):
1172
        user = get_local_user('kpap@synnefo.org')
1173
        user.add_auth_provider('shibboleth', identifier='1234')
1174
        user.add_auth_provider('shibboleth', identifier='12345')
1175

    
1176
        # default limit is 1
1177
        local = user.get_auth_provider('local')
1178
        self.assertEqual(local.get_add_policy, False)
1179

    
1180
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1181
        academic = user.get_auth_provider('shibboleth',
1182
                                          identifier='1234')
1183
        self.assertEqual(academic.get_add_policy, False)
1184
        newacademic = auth_providers.get_provider('shibboleth', user,
1185
                                                  identifier='123456')
1186
        self.assertEqual(newacademic.get_add_policy, True)
1187
        user.add_auth_provider('shibboleth', identifier='123456')
1188
        self.assertEqual(academic.get_add_policy, False)
1189
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1190

    
1191
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1192
    @shibboleth_settings(LIMIT_POLICY=2)
1193
    def test_messages(self):
1194
        user = get_local_user('kpap@synnefo.org')
1195
        user.add_auth_provider('shibboleth', identifier='1234')
1196
        user.add_auth_provider('shibboleth', identifier='12345')
1197
        provider = auth_providers.get_provider('shibboleth')
1198
        self.assertEqual(provider.get_message('title'), 'Academic')
1199
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1200
        # regenerate messages cache
1201
        provider = auth_providers.get_provider('shibboleth')
1202
        self.assertEqual(provider.get_message('title'), 'New title')
1203
        self.assertEqual(provider.get_message('login_title'),
1204
                         'New title LOGIN')
1205
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1206
        self.assertEqual(provider.get_module_icon,
1207
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1208
        self.assertEqual(provider.get_module_medium_icon,
1209
                         settings.MEDIA_URL +
1210
                         'im/auth/icons-medium/shibboleth.png')
1211

    
1212
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1213
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1214
        self.assertEqual(provider.get_method_details_msg,
1215
                         'Account: 12345')
1216
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1217
        self.assertEqual(provider.get_method_details_msg,
1218
                         'Account: 1234')
1219

    
1220
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1221
        self.assertEqual(provider.get_not_active_msg,
1222
                         "'Academic login' is disabled.")
1223

    
1224
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1225
    @shibboleth_settings(LIMIT_POLICY=2)
1226
    def test_templates(self):
1227
        user = get_local_user('kpap@synnefo.org')
1228
        user.add_auth_provider('shibboleth', identifier='1234')
1229
        user.add_auth_provider('shibboleth', identifier='12345')
1230

    
1231
        provider = auth_providers.get_provider('shibboleth')
1232
        self.assertEqual(provider.get_template('login'),
1233
                         'im/auth/shibboleth_login.html')
1234
        provider = auth_providers.get_provider('google')
1235
        self.assertEqual(provider.get_template('login'),
1236
                         'im/auth/generic_login.html')
1237

    
1238

    
1239
class TestActivationBackend(TestCase):
1240

    
1241
    def setUp(self):
1242
        # dummy call to pass through logging middleware
1243
        self.client.get(ui_url(''))
1244

    
1245
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1246
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1247
    def test_policies(self):
1248
        backend = activation_backends.get_backend()
1249

    
1250
        # email matches RE_USER_EMAIL_PATTERNS
1251
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1252
                               is_active=False, email_verified=False)
1253
        backend.handle_verification(user1, user1.verification_code)
1254
        self.assertEqual(user1.accepted_policy, 'email')
1255

    
1256
        # manually moderated
1257
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1258
                               is_active=False, email_verified=False)
1259

    
1260
        backend.handle_verification(user2, user2.verification_code)
1261
        self.assertEqual(user2.moderated, False)
1262
        backend.handle_moderation(user2)
1263
        self.assertEqual(user2.moderated, True)
1264
        self.assertEqual(user2.accepted_policy, 'manual')
1265

    
1266
        # autoaccept due to provider automoderate policy
1267
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1268
                               is_active=False, email_verified=False)
1269
        user3.auth_providers.all().delete()
1270
        user3.add_auth_provider('shibboleth', identifier='shib123')
1271
        backend.handle_verification(user3, user3.verification_code)
1272
        self.assertEqual(user3.moderated, True)
1273
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1274

    
1275
    @im_settings(MODERATION_ENABLED=False,
1276
                 MANAGERS=(('Manager',
1277
                            'manager@synnefo.org'),),
1278
                 HELPDESK=(('Helpdesk',
1279
                            'helpdesk@synnefo.org'),),
1280
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1281
    def test_without_moderation(self):
1282
        backend = activation_backends.get_backend()
1283
        form = backend.get_signup_form('local')
1284
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1285

    
1286
        user_data = {
1287
            'email': 'kpap@synnefo.org',
1288
            'first_name': 'Kostas Papas',
1289
            'password1': '123',
1290
            'password2': '123'
1291
        }
1292
        form = backend.get_signup_form('local', user_data)
1293
        user = form.create_user()
1294
        self.assertEqual(user.is_active, False)
1295
        self.assertEqual(user.email_verified, False)
1296

    
1297
        # step one, registration
1298
        result = backend.handle_registration(user)
1299
        user = AstakosUser.objects.get()
1300
        self.assertEqual(user.is_active, False)
1301
        self.assertEqual(user.email_verified, False)
1302
        self.assertTrue(user.verification_code)
1303
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1304
        backend.send_result_notifications(result, user)
1305
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1306
        self.assertEqual(len(mail.outbox), 1)
1307

    
1308
        # step two, verify email (automatically
1309
        # moderates/accepts user, since moderation is disabled)
1310
        user = AstakosUser.objects.get()
1311
        valid_code = user.verification_code
1312

    
1313
        # test invalid code
1314
        result = backend.handle_verification(user, valid_code)
1315
        backend.send_result_notifications(result, user)
1316
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1317
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1318
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1319
        # verification + activated + greeting = 3
1320
        self.assertEqual(len(mail.outbox), 3)
1321
        user = AstakosUser.objects.get()
1322
        self.assertEqual(user.is_active, True)
1323
        self.assertEqual(user.moderated, True)
1324
        self.assertTrue(user.moderated_at)
1325
        self.assertEqual(user.email_verified, True)
1326
        self.assertTrue(user.activation_sent)
1327

    
1328
    @im_settings(MODERATION_ENABLED=True,
1329
                 MANAGERS=(('Manager',
1330
                            'manager@synnefo.org'),),
1331
                 HELPDESK=(('Helpdesk',
1332
                            'helpdesk@synnefo.org'),),
1333
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1334
    def test_with_moderation(self):
1335

    
1336
        backend = activation_backends.get_backend()
1337
        form = backend.get_signup_form('local')
1338
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1339

    
1340
        user_data = {
1341
            'email': 'kpap@synnefo.org',
1342
            'first_name': 'Kostas Papas',
1343
            'password1': '123',
1344
            'password2': '123'
1345
        }
1346
        form = backend.get_signup_form(provider='local',
1347
                                       initial_data=user_data)
1348
        user = form.create_user()
1349
        self.assertEqual(user.is_active, False)
1350
        self.assertEqual(user.email_verified, False)
1351

    
1352
        # step one, registration
1353
        result = backend.handle_registration(user)
1354
        user = AstakosUser.objects.get()
1355
        self.assertEqual(user.is_active, False)
1356
        self.assertEqual(user.email_verified, False)
1357
        self.assertTrue(user.verification_code)
1358
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1359
        backend.send_result_notifications(result, user)
1360
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1361
        self.assertEqual(len(mail.outbox), 1)
1362

    
1363
        # step two, verifying email
1364
        user = AstakosUser.objects.get()
1365
        valid_code = user.verification_code
1366
        invalid_code = user.verification_code + 'invalid'
1367

    
1368
        # test invalid code
1369
        result = backend.handle_verification(user, invalid_code)
1370
        self.assertEqual(result.status, backend.Result.ERROR)
1371
        backend.send_result_notifications(result, user)
1372
        user = AstakosUser.objects.get()
1373
        self.assertEqual(user.is_active, False)
1374
        self.assertEqual(user.moderated, False)
1375
        self.assertEqual(user.moderated_at, None)
1376
        self.assertEqual(user.email_verified, False)
1377
        self.assertTrue(user.activation_sent)
1378

    
1379
        # test valid code
1380
        user = AstakosUser.objects.get()
1381
        result = backend.handle_verification(user, valid_code)
1382
        backend.send_result_notifications(result, user)
1383
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1384
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1385
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1386
        self.assertEqual(len(mail.outbox), 2)
1387
        user = AstakosUser.objects.get()
1388
        self.assertEqual(user.moderated, False)
1389
        self.assertEqual(user.moderated_at, None)
1390
        self.assertEqual(user.email_verified, True)
1391
        self.assertTrue(user.activation_sent)
1392

    
1393
        # test code reuse
1394
        result = backend.handle_verification(user, valid_code)
1395
        self.assertEqual(result.status, backend.Result.ERROR)
1396
        user = AstakosUser.objects.get()
1397
        self.assertEqual(user.is_active, False)
1398
        self.assertEqual(user.moderated, False)
1399
        self.assertEqual(user.moderated_at, None)
1400
        self.assertEqual(user.email_verified, True)
1401
        self.assertTrue(user.activation_sent)
1402

    
1403
        # valid code on verified user
1404
        user = AstakosUser.objects.get()
1405
        valid_code = user.verification_code
1406
        result = backend.handle_verification(user, valid_code)
1407
        self.assertEqual(result.status, backend.Result.ERROR)
1408

    
1409
        # step three, moderation user
1410
        user = AstakosUser.objects.get()
1411
        result = backend.handle_moderation(user)
1412
        backend.send_result_notifications(result, user)
1413

    
1414
        user = AstakosUser.objects.get()
1415
        self.assertEqual(user.is_active, True)
1416
        self.assertEqual(user.moderated, True)
1417
        self.assertTrue(user.moderated_at)
1418
        self.assertEqual(user.email_verified, True)
1419
        self.assertTrue(user.activation_sent)
1420

    
1421

    
1422
class TestWebloginRedirect(TestCase):
1423

    
1424
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1425
    def test_restricts_domains(self):
1426
        get_local_user('user1@synnefo.org')
1427

    
1428
        # next url construct helpers
1429
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1430
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1431
            urllib.quote_plus(nxt)
1432

    
1433
        # common cases
1434
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1435
        invalid_scheme = weblogin("customscheme://localhost")
1436
        invalid_scheme_with_valid_domain = \
1437
            weblogin("http://www.invaliddomain.com")
1438
        valid_scheme = weblogin("pithos://localhost/")
1439
        # to be used in assertRedirects
1440
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1441

    
1442
        # not authenticated, redirects to login which contains next param with
1443
        # additional nested quoted next params
1444
        r = self.client.get(valid_scheme, follow=True)
1445
        login_redirect = reverse('login') + '?next=' + \
1446
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1447
        self.assertRedirects(r, login_redirect)
1448

    
1449
        # authenticate client
1450
        self.client.login(username="user1@synnefo.org", password="password")
1451

    
1452
        # valid scheme
1453
        r = self.client.get(valid_scheme, follow=True)
1454
        url = r.redirect_chain[1][0]
1455
        # scheme preserved
1456
        self.assertTrue(url.startswith('pithos://localhost/'))
1457
        # redirect contains token param
1458
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1459
                                   scheme='https').query
1460
        params = urlparse.parse_qs(params)
1461
        self.assertEqual(params['token'][0],
1462
                         AstakosUser.objects.get().auth_token)
1463
        # does not contain uuid
1464
        # reverted for 0.14.2 to support old pithos desktop clients
1465
        #self.assertFalse('uuid' in params)
1466

    
1467
        # invalid cases
1468
        r = self.client.get(invalid_scheme, follow=True)
1469
        self.assertEqual(r.status_code, 403)
1470

    
1471
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1472
        self.assertEqual(r.status_code, 403)
1473

    
1474
        r = self.client.get(invalid_domain, follow=True)
1475
        self.assertEqual(r.status_code, 403)