Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 95b7c3f6

History | View | Annotate | Download (67.6 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True,
81
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
82
        token = PendingThirdPartyUser.objects.get().token
83
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
84
        self.assertEqual(r.status_code, 200)
85

    
86
        # a new pending user created
87
        pending_user = PendingThirdPartyUser.objects.get(
88
            third_party_identifier="kpapeppn")
89
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
90
        # keep the token for future use
91
        token = pending_user.token
92
        # from now on no shibboleth headers are sent to the server
93
        client.reset_tokens()
94

    
95
        # this is the old way, it should fail, to avoid pending user take over
96
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
97
        self.assertEqual(r.status_code, 404)
98

    
99
        # this is the signup unique url associated with the pending user
100
        # created
101
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
102
        identifier = pending_user.third_party_identifier
103
        post_data = {'third_party_identifier': identifier,
104
                     'first_name': 'Kostas',
105
                     'third_party_token': token,
106
                     'last_name': 'Mitroglou',
107
                     'provider': 'shibboleth'}
108

    
109
        signup_url = reverse('signup')
110

    
111
        # invlid email
112
        post_data['email'] = 'kpap'
113
        r = client.post(signup_url, post_data)
114
        self.assertContains(r, token)
115

    
116
        # existing email
117
        existing_user = get_local_user('test@test.com')
118
        post_data['email'] = 'test@test.com'
119
        r = client.post(signup_url, post_data)
120
        self.assertContains(r, messages.EMAIL_USED)
121
        existing_user.delete()
122

    
123
        # and finally a valid signup
124
        post_data['email'] = 'kpap-takeover@synnefo.org'
125
        r = client.post(signup_url, post_data, follow=True)
126
        self.assertContains(r, messages.VERIFICATION_SENT)
127

    
128
        # takeover of the uverified the shibboleth identifier
129
        client = ShibbolethClient()
130
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
131
                          eppn="kpapeppn",
132
                          cn="Kostas Papadimitriou",
133
                          ep_affiliation="Test Affiliation")
134
        r = client.get(ui_url('login/shibboleth?'), follow=True,
135
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
136
        # a new pending user created, previous one was deleted
137
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
138
        pending_user = PendingThirdPartyUser.objects.get(
139
            third_party_identifier="kpapeppn")
140
        identifier = pending_user.third_party_identifier
141
        token = pending_user.token
142
        post_data = {'third_party_identifier': identifier,
143
                     'third_party_token': token}
144
        post_data['email'] = 'kpap@synnefo.org'
145
        r = client.post(signup_url, post_data)
146
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
147
        # previously unverified user associated with kpapeppn gets deleted
148
        user_qs = AstakosUser.objects.filter(email="kpap-takeover@synnefo.org")
149
        self.assertEqual(user_qs.count(), 0)
150

    
151
        # entires commited as expected
152
        self.assertEqual(AstakosUser.objects.count(), 1)
153
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
154

    
155
        user = AstakosUser.objects.get()
156
        provider = user.get_auth_provider("shibboleth")
157
        headers = provider.provider_details['info']['headers']
158
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
159

    
160
        # provider info stored
161
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
162
        self.assertEqual(provider.affiliation, 'Test Affiliation')
163
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
164
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
165
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
166
        self.assertTrue('headers' in provider.info)
167

    
168
        # login (not verified yet)
169
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
170
                          cn="Kostas Papadimitriou")
171
        r = client.get(ui_url("login/shibboleth?"), follow=True)
172
        self.assertContains(r, 'A pending registration exists for')
173
        self.assertNotContains(r, 'pending moderation')
174
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
175
        tmp_third_party = PendingThirdPartyUser.objects.get()
176

    
177
        # user gets verified
178
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
179
        backend = activation_backends.get_backend()
180
        activation_result = backend.verify_user(u, u.verification_code)
181
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
182
                          cn="Kostas Papadimitriou")
183
        r = client.get(ui_url("login/shibboleth?"), follow=True)
184
        self.assertNotContains(r, 'A pending registration exists for')
185
        self.assertContains(r, 'pending moderation')
186

    
187
        # temporary signup process continues. meanwhile the user have verified
188
        # her account. The signup process should fail
189
        tp = tmp_third_party
190
        post_data = {'third_party_identifier': tp.third_party_identifier,
191
                     'email': 'unsed-email@synnefo.org',
192
                     'third_party_token': tp.token}
193
        r = client.post(signup_url, post_data)
194
        self.assertEqual(r.status_code, 404)
195

    
196
        # admin activates the user
197
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
198
        backend = activation_backends.get_backend()
199
        activation_result = backend.verify_user(u, u.verification_code)
200
        activation_result = backend.accept_user(u)
201
        self.assertFalse(activation_result.is_error())
202
        backend.send_result_notifications(activation_result, u)
203
        self.assertEqual(u.is_active, True)
204

    
205

    
206
        # we see our profile
207
        r = client.get(ui_url("login/shibboleth?"), follow=True)
208
        self.assertRedirects(r, ui_url('landing'))
209
        self.assertEqual(r.status_code, 200)
210

    
211
    def test_existing(self):
212
        """
213
        Test adding of third party login to an existing account
214
        """
215

    
216
        # this is our existing user
217
        existing_user = get_local_user('kpap@synnefo.org')
218
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
219
        existing_inactive.is_active = False
220
        existing_inactive.save()
221

    
222
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
223
        existing_unverified.is_active = False
224
        existing_unverified.activation_sent = None
225
        existing_unverified.email_verified = False
226
        existing_unverified.is_verified = False
227
        existing_unverified.save()
228

    
229
        client = ShibbolethClient()
230
        # shibboleth logged us in, notice that we use different email
231
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
232
                          cn="Kostas Papadimitriou", )
233
        r = client.get(ui_url("login/shibboleth?"), follow=True)
234

    
235
        # a new pending user created
236
        pending_user = PendingThirdPartyUser.objects.get()
237
        token = pending_user.token
238
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
239
        pending_key = pending_user.token
240
        client.reset_tokens()
241
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
242

    
243
        form = r.context['login_form']
244
        signupdata = copy.copy(form.initial)
245
        signupdata['email'] = 'kpap@synnefo.org'
246
        signupdata['third_party_token'] = token
247
        signupdata['provider'] = 'shibboleth'
248
        signupdata.pop('id', None)
249

    
250
        # the email exists to another user
251
        r = client.post(ui_url("signup"), signupdata)
252
        self.assertContains(r, "There is already an account with this email "
253
                               "address")
254
        # change the case, still cannot create
255
        signupdata['email'] = 'KPAP@synnefo.org'
256
        r = client.post(ui_url("signup"), signupdata)
257
        self.assertContains(r, "There is already an account with this email "
258
                               "address")
259
        # inactive user
260
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
261
        r = client.post(ui_url("signup"), signupdata)
262
        self.assertContains(r, "There is already an account with this email "
263
                               "address")
264

    
265
        # unverified user, this should pass, old entry will be deleted
266
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
267
        r = client.post(ui_url("signup"), signupdata)
268

    
269
        post_data = {'password': 'password',
270
                     'username': 'kpap@synnefo.org'}
271
        r = client.post(ui_url('local'), post_data, follow=True)
272
        self.assertTrue(r.context['request'].user.is_authenticated())
273
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
274
                          cn="Kostas Papadimitriou", )
275
        r = client.get(ui_url("login/shibboleth?"), follow=True)
276
        self.assertContains(r, "enabled for this account")
277
        client.reset_tokens()
278

    
279
        user = existing_user
280
        self.assertTrue(user.has_auth_provider('shibboleth'))
281
        self.assertTrue(user.has_auth_provider('local',
282
                                               auth_backend='astakos'))
283
        client.logout()
284

    
285
        # look Ma, i can login with both my shibboleth and local account
286
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
287
                          cn="Kostas Papadimitriou")
288
        r = client.get(ui_url("login/shibboleth?"), follow=True)
289
        self.assertTrue(r.context['request'].user.is_authenticated())
290
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
291
        self.assertRedirects(r, ui_url('landing'))
292
        self.assertEqual(r.status_code, 200)
293

    
294
        user = r.context['request'].user
295
        client.logout()
296
        client.reset_tokens()
297

    
298
        # logged out
299
        r = client.get(ui_url("profile"), follow=True)
300
        self.assertFalse(r.context['request'].user.is_authenticated())
301

    
302
        # login with local account also works
303
        post_data = {'password': 'password',
304
                     'username': 'kpap@synnefo.org'}
305
        r = self.client.post(ui_url('local'), post_data, follow=True)
306
        self.assertTrue(r.context['request'].user.is_authenticated())
307
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
308
        self.assertRedirects(r, ui_url('landing'))
309
        self.assertEqual(r.status_code, 200)
310

    
311
        # cannot add the same eppn
312
        client.set_tokens(mail="secondary@shibboleth.gr",
313
                          remote_user="kpapeppn",
314
                          cn="Kostas Papadimitriou", )
315
        r = client.get(ui_url("login/shibboleth?"), follow=True)
316
        self.assertRedirects(r, ui_url('landing'))
317
        self.assertTrue(r.status_code, 200)
318
        self.assertEquals(existing_user.auth_providers.count(), 2)
319

    
320
        # only one allowed by default
321
        client.set_tokens(mail="secondary@shibboleth.gr",
322
                          remote_user="kpapeppn2",
323
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
324
        prov = auth_providers.get_provider('shibboleth')
325
        r = client.get(ui_url("login/shibboleth?"), follow=True)
326
        self.assertContains(r, "Failed to add")
327
        self.assertRedirects(r, ui_url('profile'))
328
        self.assertTrue(r.status_code, 200)
329
        self.assertEquals(existing_user.auth_providers.count(), 2)
330
        client.logout()
331
        client.reset_tokens()
332

    
333
        # cannot login with another eppn
334
        client.set_tokens(mail="kpap@synnefo.org",
335
                          remote_user="kpapeppninvalid",
336
                          cn="Kostas Papadimitriou")
337
        r = client.get(ui_url("login/shibboleth?"), follow=True)
338
        self.assertFalse(r.context['request'].user.is_authenticated())
339

    
340
        # cannot
341

    
342
        # lets remove local password
343
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
344
                                       email="kpap@synnefo.org")
345
        remove_local_url = user.get_auth_provider('local').get_remove_url
346
        remove_shibbo_url = user.get_auth_provider('shibboleth',
347
                                                   'kpapeppn').get_remove_url
348
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
349
                          cn="Kostas Papadimtriou")
350
        r = client.get(ui_url("login/shibboleth?"), follow=True)
351
        client.reset_tokens()
352

    
353
        # only POST is allowed (for CSRF protection)
354
        r = client.get(remove_local_url, follow=True)
355
        self.assertEqual(r.status_code, 405)
356

    
357
        r = client.post(remove_local_url, follow=True)
358
        # 2 providers left
359
        self.assertEqual(user.auth_providers.count(), 1)
360
        # cannot remove last provider
361
        r = client.post(remove_shibbo_url)
362
        self.assertEqual(r.status_code, 403)
363
        self.client.logout()
364

    
365
        # cannot login using local credentials (notice we use another client)
366
        post_data = {'password': 'password',
367
                     'username': 'kpap@synnefo.org'}
368
        r = self.client.post(ui_url('local'), post_data, follow=True)
369
        self.assertFalse(r.context['request'].user.is_authenticated())
370

    
371
        # we can reenable the local provider by setting a password
372
        r = client.get(ui_url("password_change"), follow=True)
373
        r = client.post(ui_url("password_change"), {'new_password1': '111',
374
                                                    'new_password2': '111'},
375
                        follow=True)
376
        user = r.context['request'].user
377
        self.assertTrue(user.has_auth_provider('local'))
378
        self.assertTrue(user.has_auth_provider('shibboleth'))
379
        self.assertTrue(user.check_password('111'))
380
        self.assertTrue(user.has_usable_password())
381

    
382
        # change password via profile form
383
        r = client.post(ui_url("profile"), {
384
            'old_password': '111',
385
            'new_password': '',
386
            'new_password2': '',
387
            'change_password': 'on',
388
        }, follow=False)
389
        self.assertEqual(r.status_code, 200)
390
        self.assertFalse(r.context['profile_form'].is_valid())
391

    
392
        self.client.logout()
393

    
394
        # now we can login
395
        post_data = {'password': '111',
396
                     'username': 'kpap@synnefo.org'}
397
        r = self.client.post(ui_url('local'), post_data, follow=True)
398
        self.assertTrue(r.context['request'].user.is_authenticated())
399

    
400
        client.reset_tokens()
401

    
402
        # we cannot take over another shibboleth identifier
403
        user2 = get_local_user('another@synnefo.org')
404
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
405
        # login
406
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
407
                          cn="Kostas Papadimitriou")
408
        r = client.get(ui_url("login/shibboleth?"), follow=True)
409
        # try to assign existing shibboleth identifier of another user
410
        client.set_tokens(mail="kpap_second@shibboleth.gr",
411
                          remote_user="existingeppn",
412
                          cn="Kostas Papadimitriou")
413
        r = client.get(ui_url("login/shibboleth?"), follow=True)
414
        self.assertContains(r, "is already in use")
415

    
416

    
417
class TestLocal(TestCase):
418

    
419
    def setUp(self):
420
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
421
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
422
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
423
        settings.ASTAKOS_MODERATION_ENABLED = True
424

    
425
    def tearDown(self):
426
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
427
        AstakosUser.objects.all().delete()
428

    
429
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
430
    def test_login_ratelimit(self):
431
        from django.core.cache import cache
432
        cache.clear()
433
        [cache.delete(key) for key in cache._cache.keys()]
434

    
435
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
436
        r = self.client.post(ui_url('local'), credentials, follow=True)
437
        fields = r.context['login_form'].fields.keyOrder
438
        self.assertFalse('recaptcha_challenge_field' in fields)
439
        r = self.client.post(ui_url('local'), credentials, follow=True)
440
        fields = r.context['login_form'].fields.keyOrder
441
        self.assertFalse('recaptcha_challenge_field' in fields)
442
        r = self.client.post(ui_url('local'), credentials, follow=True)
443
        fields = r.context['login_form'].fields.keyOrder
444
        self.assertTrue('recaptcha_challenge_field' in fields)
445

    
446
    def test_no_moderation(self):
447
        # disable moderation
448
        astakos_settings.MODERATION_ENABLED = False
449

    
450
        # create a new user
451
        r = self.client.get(ui_url("signup"))
452
        self.assertEqual(r.status_code, 200)
453
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
454
                'password2': 'password', 'first_name': 'Kostas',
455
                'last_name': 'Mitroglou', 'provider': 'local'}
456
        r = self.client.post(ui_url("signup"), data)
457

    
458
        # user created
459
        self.assertEqual(AstakosUser.objects.count(), 1)
460
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
461
                                       email="kpap@synnefo.org")
462
        self.assertEqual(user.username, 'kpap@synnefo.org')
463
        self.assertEqual(user.has_auth_provider('local'), True)
464
        self.assertFalse(user.is_active)
465

    
466
        # user (but not admin) gets notified
467
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
468
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
469
        astakos_settings.MODERATION_ENABLED = True
470

    
471
    def test_email_case(self):
472
        data = {
473
            'email': 'kPap@synnefo.org',
474
            'password1': '1234',
475
            'password2': '1234'
476
        }
477

    
478
        form = forms.LocalUserCreationForm(data)
479
        self.assertTrue(form.is_valid())
480
        user = form.create_user()
481

    
482
        u = AstakosUser.objects.get()
483
        self.assertEqual(u.email, 'kPap@synnefo.org')
484
        self.assertEqual(u.username, 'kpap@synnefo.org')
485
        u.is_active = True
486
        u.email_verified = True
487
        u.save()
488

    
489
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
490
        login = forms.LoginForm(data=data)
491
        self.assertTrue(login.is_valid())
492

    
493
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
494
        login = forms.LoginForm(data=data)
495
        self.assertTrue(login.is_valid())
496

    
497
        data = {
498
            'email': 'kpap@synnefo.org',
499
            'password1': '1234',
500
            'password2': '1234'
501
        }
502
        form = forms.LocalUserCreationForm(data)
503
        self.assertFalse(form.is_valid())
504

    
505
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
506
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
507
    def test_local_provider(self):
508
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
509

    
510
        # create a user
511
        r = self.client.get(ui_url("signup"))
512
        self.assertEqual(r.status_code, 200)
513
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
514
                'password2': 'password', 'first_name': 'Kostas',
515
                'last_name': 'Mitroglou', 'provider': 'local'}
516
        r = self.client.post(ui_url("signup"), data)
517

    
518
        # user created
519
        self.assertEqual(AstakosUser.objects.count(), 1)
520
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
521
                                       email="kpap@synnefo.org")
522
        self.assertEqual(user.username, 'kpap@synnefo.org')
523
        self.assertEqual(user.has_auth_provider('local'), True)
524
        self.assertFalse(user.is_active)  # not activated
525
        self.assertFalse(user.email_verified)  # not verified
526
        self.assertTrue(user.activation_sent)  # activation automatically sent
527
        self.assertFalse(user.moderated)
528
        self.assertFalse(user.email_verified)
529

    
530
        # admin gets notified and activates the user from the command line
531
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
532
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
533
                                               'password': 'password'},
534
                             follow=True)
535
        self.assertContains(r, messages.VERIFICATION_SENT)
536
        backend = activation_backends.get_backend()
537

    
538
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
539
        backend.send_user_verification_email(user)
540

    
541
        # user activation fields updated and user gets notified via email
542
        user = AstakosUser.objects.get(pk=user.pk)
543
        self.assertTrue(user.activation_sent)
544
        self.assertFalse(user.email_verified)
545
        self.assertFalse(user.is_active)
546
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
547

    
548
        # user forgot she got registered and tries to submit registration
549
        # form. Notice the upper case in email
550
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
551
                'password2': 'password', 'first_name': 'Kostas',
552
                'last_name': 'Mitroglou', 'provider': 'local'}
553
        r = self.client.post(ui_url("signup"), data, follow=True)
554
        self.assertRedirects(r, reverse('login'))
555
        self.assertContains(r, messages.VERIFICATION_SENT)
556

    
557
        user = AstakosUser.objects.get()
558
        # previous user replaced
559
        self.assertTrue(user.activation_sent)
560
        self.assertFalse(user.email_verified)
561
        self.assertFalse(user.is_active)
562
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
563

    
564
        # hmmm, email exists; lets request a password change
565
        r = self.client.get(ui_url('local/password_reset'))
566
        self.assertEqual(r.status_code, 200)
567
        data = {'email': 'kpap@synnefo.org'}
568
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
569
        # she can't because account is not active yet
570
        self.assertContains(r, 'pending activation')
571

    
572
        # moderation is enabled and an activation email has already been sent
573
        # so user can trigger resend of the activation email
574
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
575
                            follow=True)
576
        self.assertContains(r, 'has been sent to your email address.')
577
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
578

    
579
        # also she cannot login
580
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
581
        r = self.client.post(ui_url('local'), data, follow=True)
582
        self.assertContains(r, 'Resend activation')
583
        self.assertFalse(r.context['request'].user.is_authenticated())
584
        self.assertFalse('_pithos2_a' in self.client.cookies)
585

    
586
        # user sees the message and resends activation
587
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
588
                            follow=True)
589
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
590

    
591
        # logged in user cannot activate another account
592
        tmp_user = get_local_user("test_existing_user@synnefo.org")
593
        tmp_client = Client()
594
        tmp_client.login(username="test_existing_user@synnefo.org",
595
                         password="password")
596
        r = tmp_client.get(user.get_activation_url(), follow=True)
597
        self.assertContains(r, messages.LOGGED_IN_WARNING)
598

    
599
        # empty activation code is not allowed
600
        r = self.client.get(user.get_activation_url().split("?")[0],
601
                            follow=True)
602
        self.assertEqual(r.status_code, 403)
603

    
604
        r = self.client.get(user.get_activation_url(), follow=True)
605
        # previous code got invalidated
606
        self.assertRedirects(r, reverse('login'))
607
        self.assertContains(r, astakos_messages.INVALID_ACTIVATION_KEY)
608
        self.assertEqual(r.status_code, 200)
609

    
610
        user = AstakosUser.objects.get(pk=user.pk)
611
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
612
        r = self.client.get(user.get_activation_url(), follow=True)
613
        self.assertRedirects(r, reverse('login'))
614
        # user sees that account is pending approval from admins
615
        self.assertContains(r, messages.NOTIFICATION_SENT)
616
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
617

    
618
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
619
        result = backend.handle_moderation(user)
620
        backend.send_result_notifications(result, user)
621
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
622
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
623

    
624
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
625
        r = self.client.get(ui_url('profile'), follow=True)
626
        self.assertFalse(r.context['request'].user.is_authenticated())
627
        self.assertFalse('_pithos2_a' in self.client.cookies)
628
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
629

    
630
        user = AstakosUser.objects.get(pk=user.pk)
631
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
632
                                               'password': 'password'},
633
                             follow=True)
634
        # user activated and logged in, token cookie set
635
        self.assertTrue(r.context['request'].user.is_authenticated())
636
        self.assertTrue('_pithos2_a' in self.client.cookies)
637
        cookies = self.client.cookies
638
        self.assertTrue(quote(user.auth_token) in
639
                        cookies.get('_pithos2_a').value)
640
        r = self.client.get(ui_url('logout'), follow=True)
641
        r = self.client.get(ui_url(''), follow=True)
642
        self.assertRedirects(r, ui_url('login'))
643
        # user logged out, token cookie removed
644
        self.assertFalse(r.context['request'].user.is_authenticated())
645
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
646

    
647
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
648
        del self.client.cookies['_pithos2_a']
649

    
650
        # user can login
651
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
652
                                               'password': 'password'},
653
                             follow=True)
654
        self.assertTrue(r.context['request'].user.is_authenticated())
655
        self.assertTrue('_pithos2_a' in self.client.cookies)
656
        cookies = self.client.cookies
657
        self.assertTrue(quote(user.auth_token) in
658
                        cookies.get('_pithos2_a').value)
659
        self.client.get(ui_url('logout'), follow=True)
660

    
661
        # user forgot password
662
        old_pass = user.password
663
        r = self.client.get(ui_url('local/password_reset'))
664
        self.assertEqual(r.status_code, 200)
665
        r = self.client.post(ui_url('local/password_reset'),
666
                             {'email': 'kpap@synnefo.org'})
667
        self.assertEqual(r.status_code, 302)
668
        # email sent
669
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
670

    
671
        # user visits change password link
672
        user = AstakosUser.objects.get(pk=user.pk)
673
        r = self.client.get(user.get_password_reset_url())
674
        r = self.client.post(user.get_password_reset_url(),
675
                             {'new_password1': 'newpass',
676
                              'new_password2': 'newpass'})
677

    
678
        user = AstakosUser.objects.get(pk=user.pk)
679
        self.assertNotEqual(old_pass, user.password)
680

    
681
        # old pass is not usable
682
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
683
                                               'password': 'password'})
684
        self.assertContains(r, 'Please enter a correct username and password')
685
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
686
                                               'password': 'newpass'},
687
                             follow=True)
688
        self.assertTrue(r.context['request'].user.is_authenticated())
689
        self.client.logout()
690

    
691
        # tests of special local backends
692
        user = AstakosUser.objects.get(pk=user.pk)
693
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
694
        user.save()
695

    
696
        # non astakos local backends do not support password reset
697
        r = self.client.get(ui_url('local/password_reset'))
698
        self.assertEqual(r.status_code, 200)
699
        r = self.client.post(ui_url('local/password_reset'),
700
                             {'email': 'kpap@synnefo.org'})
701
        # she can't because account is not active yet
702
        self.assertContains(r, "Changing password is not")
703

    
704
    def test_fix_superuser(self):
705
        u = User.objects.create(username="dummy", email="email@example.org",
706
                                first_name="Super", last_name="User",
707
                                is_superuser=True)
708
        User.objects.create(username="dummy2", email="email2@example.org",
709
                            first_name="Other", last_name="User")
710
        fixed = auth_functions.fix_superusers()
711
        self.assertEqual(len(fixed), 1)
712
        fuser = fixed[0]
713
        self.assertEqual(fuser.email, fuser.username)
714

    
715

    
716
class UserActionsTests(TestCase):
717

    
718
    def test_email_validation(self):
719
        backend = activation_backends.get_backend()
720
        form = backend.get_signup_form('local')
721
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
722
        user_data = {
723
            'email': 'kpap@synnefo.org',
724
            'first_name': 'Kostas Papas',
725
            'password1': '123',
726
            'password2': '123'
727
        }
728
        form = backend.get_signup_form(provider='local',
729
                                       initial_data=user_data)
730
        self.assertTrue(form.is_valid())
731
        user_data['email'] = 'kpap@synnefo.org.'
732
        form = backend.get_signup_form(provider='local',
733
                                       initial_data=user_data)
734
        self.assertFalse(form.is_valid())
735

    
736
    def test_email_change(self):
737
        # to test existing email validation
738
        get_local_user('existing@synnefo.org')
739

    
740
        # local user
741
        user = get_local_user('kpap@synnefo.org')
742

    
743
        # login as kpap
744
        self.client.login(username='kpap@synnefo.org', password='password')
745
        r = self.client.get(ui_url('profile'), follow=True)
746
        user = r.context['request'].user
747
        self.assertTrue(user.is_authenticated())
748

    
749
        # change email is enabled
750
        r = self.client.get(ui_url('email_change'))
751
        self.assertEqual(r.status_code, 200)
752
        self.assertFalse(user.email_change_is_pending())
753

    
754
        # invalid email format
755
        data = {'new_email_address': 'existing@synnefo.org.'}
756
        r = self.client.post(ui_url('email_change'), data)
757
        form = r.context['form']
758
        self.assertFalse(form.is_valid())
759

    
760
        # request email change to an existing email fails
761
        data = {'new_email_address': 'existing@synnefo.org'}
762
        r = self.client.post(ui_url('email_change'), data)
763

    
764
        self.assertContains(r, messages.EMAIL_USED)
765

    
766
        # proper email change
767
        data = {'new_email_address': 'kpap@gmail.com'}
768
        r = self.client.post(ui_url('email_change'), data, follow=True)
769
        self.assertRedirects(r, ui_url('profile'))
770
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
771
        change1 = EmailChange.objects.get()
772

    
773
        # user sees a warning
774
        r = self.client.get(ui_url('email_change'))
775
        self.assertEqual(r.status_code, 200)
776
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
777
        self.assertTrue(user.email_change_is_pending())
778

    
779
        # link was sent
780
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
781
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
782

    
783
        # proper email change
784
        data = {'new_email_address': 'kpap@yahoo.com'}
785
        r = self.client.post(ui_url('email_change'), data, follow=True)
786
        self.assertRedirects(r, ui_url('profile'))
787
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
788
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
789
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
790
        change2 = EmailChange.objects.get()
791

    
792
        r = self.client.get(change1.get_url())
793
        self.assertEquals(r.status_code, 404)
794
        self.client.logout()
795

    
796
        invalid_client = Client()
797
        r = invalid_client.post(ui_url('local?'),
798
                                {'username': 'existing@synnefo.org',
799
                                 'password': 'password'})
800
        r = invalid_client.get(change2.get_url(), follow=True)
801
        self.assertEquals(r.status_code, 403)
802

    
803
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
804
                             {'username': 'kpap@synnefo.org',
805
                              'password': 'password',
806
                              'next': change2.get_url()},
807
                             follow=True)
808
        self.assertRedirects(r, ui_url('profile'))
809
        user = r.context['request'].user
810
        self.assertEquals(user.email, 'kpap@yahoo.com')
811
        self.assertEquals(user.username, 'kpap@yahoo.com')
812

    
813
        self.client.logout()
814
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
815
                             {'username': 'kpap@synnefo.org',
816
                              'password': 'password',
817
                              'next': change2.get_url()},
818
                             follow=True)
819
        self.assertContains(r, "Please enter a correct username and password")
820
        self.assertEqual(user.emailchanges.count(), 0)
821

    
822
        AstakosUser.objects.all().delete()
823
        Group.objects.all().delete()
824

    
825

    
826
TEST_TARGETED_ID1 = \
827
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
828
TEST_TARGETED_ID2 = \
829
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
830
TEST_TARGETED_ID3 = \
831
    "https://idp.synnefo.org/idp/shibboleth!"
832

    
833

    
834
class TestAuthProviderViews(TestCase):
835

    
836
    def tearDown(self):
837
        AstakosUser.objects.all().delete()
838

    
839
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
840
                 SHIBBOLETH_MIGRATE_EPPN=True)
841
    def migrate_to_remote_id(self):
842
        eppn_user = get_local_user("eppn@synnefo.org")
843
        tid_user = get_local_user("tid@synnefo.org")
844
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
845
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
846

    
847
        get_user = lambda r: r.context['request'].user
848

    
849
        client = ShibbolethClient()
850
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
851
        r = client.get(ui_url('login/shibboleth?'), follow=True)
852
        self.assertTrue(get_user(r).is_authenticated())
853
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
854
                         TEST_TARGETED_ID2)
855

    
856
        client = ShibbolethClient()
857
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
858
        r = client.get(ui_url('login/shibboleth?'), follow=True)
859
        self.assertTrue(get_user(r).is_authenticated())
860
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
861
                         TEST_TARGETED_ID1)
862

    
863
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
864
                         AUTOMODERATE_POLICY=True)
865
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
866
                 HELPDESK=(('support', 'support@synnefo.org'),),
867
                 FORCE_PROFILE_UPDATE=False)
868
    def test_user(self):
869
        Profile = AuthProviderPolicyProfile
870
        Pending = PendingThirdPartyUser
871
        User = AstakosUser
872

    
873
        auth_functions.make_user("newuser@synnefo.org")
874
        get_local_user("olduser@synnefo.org")
875
        cl_olduser = ShibbolethClient()
876
        get_local_user("olduser2@synnefo.org")
877
        ShibbolethClient()
878
        cl_newuser = ShibbolethClient()
879
        cl_newuser2 = Client()
880

    
881
        academic_group, created = Group.objects.get_or_create(
882
            name='academic-login')
883
        academic_users = academic_group.user_set
884
        assert created
885
        policy_only_academic = Profile.objects.add_policy('academic_strict',
886
                                                          'shibboleth',
887
                                                          academic_group,
888
                                                          exclusive=True,
889
                                                          login=False,
890
                                                          add=False)
891

    
892
        # new academic user
893
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
894
        cl_newuser.set_tokens(remote_user="newusereppn",
895
                              mail="newuser@synnefo.org", surname="Lastname")
896
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
897
        initial = r.context['signup_form'].initial
898
        pending = Pending.objects.get()
899
        self.assertEqual(initial.get('last_name'), 'Lastname')
900
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
901
        identifier = pending.third_party_identifier
902
        signup_data = {'third_party_identifier': identifier,
903
                       'first_name': 'Academic',
904
                       'third_party_token': pending.token,
905
                       'last_name': 'New User',
906
                       'provider': 'shibboleth'}
907
        r = cl_newuser.post(ui_url('signup'), signup_data)
908
        self.assertContains(r, "This field is required", )
909
        signup_data['email'] = 'olduser@synnefo.org'
910
        r = cl_newuser.post(ui_url('signup'), signup_data)
911
        self.assertContains(r, "already an account with this email", )
912
        signup_data['email'] = 'newuser@synnefo.org'
913
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
914
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
915
        self.assertEqual(r.status_code, 404)
916
        newuser = User.objects.get(email="newuser@synnefo.org")
917
        activation_link = newuser.get_activation_url()
918
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
919

    
920
        # new non-academic user
921
        signup_data = {'first_name': 'Non Academic',
922
                       'last_name': 'New User',
923
                       'provider': 'local',
924
                       'password1': 'password',
925
                       'password2': 'password'}
926
        signup_data['email'] = 'olduser@synnefo.org'
927
        r = cl_newuser2.post(ui_url('signup'), signup_data)
928
        self.assertContains(r, 'There is already an account with this '
929
                               'email address')
930
        signup_data['email'] = 'newuser@synnefo.org'
931
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
932
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
933
        r = self.client.get(activation_link, follow=True)
934
        self.assertEqual(r.status_code, 404)
935
        newuser = User.objects.get(email="newuser@synnefo.org")
936
        self.assertTrue(newuser.activation_sent)
937

    
938
        # activation sent, user didn't open verification url so additional
939
        # registrations invalidate the previous signups.
940
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
941
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
942
        pending = Pending.objects.get()
943
        identifier = pending.third_party_identifier
944
        signup_data = {'third_party_identifier': identifier,
945
                       u'first_name': 'Academic γιούνικοουντ',
946
                       'third_party_token': pending.token,
947
                       'last_name': 'New User',
948
                       'provider': 'shibboleth'}
949
        signup_data['email'] = 'newuser@synnefo.org'
950
        r = cl_newuser.post(ui_url('signup'), signup_data)
951
        self.assertEqual(r.status_code, 302)
952
        newuser = User.objects.get(email="newuser@synnefo.org")
953
        self.assertTrue(newuser.activation_sent)
954
        activation_link = newuser.get_activation_url()
955
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
956
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
957
        self.assertRedirects(r, ui_url('landing'))
958
        helpdesk_email = astakos_settings.HELPDESK[0][1]
959
        self.assertEqual(len(get_mailbox(helpdesk_email)), 1)
960
        self.assertTrue(u'AstakosUser: Academic γιούνικοουντ' in \
961
                            get_mailbox(helpdesk_email)[0].body)
962
        newuser = User.objects.get(email="newuser@synnefo.org")
963
        self.assertEqual(newuser.is_active, True)
964
        self.assertEqual(newuser.email_verified, True)
965
        cl_newuser.logout()
966

    
967
        # cannot reactivate if suspended
968
        newuser.is_active = False
969
        newuser.save()
970
        r = cl_newuser.get(newuser.get_activation_url())
971
        newuser = User.objects.get(email="newuser@synnefo.org")
972
        self.assertFalse(newuser.is_active)
973

    
974
        # release suspension
975
        newuser.is_active = True
976
        newuser.save()
977

    
978
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
979
        local = auth.get_provider('local', newuser)
980
        self.assertEqual(local.get_add_policy, False)
981
        self.assertEqual(local.get_login_policy, False)
982
        r = cl_newuser.get(local.get_add_url, follow=True)
983
        self.assertRedirects(r, ui_url('profile'))
984
        self.assertContains(r, 'disabled for your')
985

    
986
        cl_olduser.login(username='olduser@synnefo.org', password="password")
987
        r = cl_olduser.get(ui_url('profile'), follow=True)
988
        self.assertEqual(r.status_code, 200)
989
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
990
        self.assertContains(r, 'Your request is missing a unique token')
991
        cl_olduser.set_tokens(remote_user="newusereppn")
992
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
993
        self.assertContains(r, 'already in use')
994
        cl_olduser.set_tokens(remote_user="oldusereppn")
995
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
996
        self.assertContains(r, 'Academic login enabled for this account')
997

    
998
        user = User.objects.get(email="olduser@synnefo.org")
999
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1000
        local_provider = user.get_auth_provider('local')
1001
        self.assertEqual(shib_provider.get_remove_policy, True)
1002
        self.assertEqual(local_provider.get_remove_policy, True)
1003

    
1004
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
1005
                                                          'shibboleth',
1006
                                                          academic_group,
1007
                                                          remove=False)
1008
        user.groups.add(academic_group)
1009
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1010
        local_provider = user.get_auth_provider('local')
1011
        self.assertEqual(shib_provider.get_remove_policy, False)
1012
        self.assertEqual(local_provider.get_remove_policy, True)
1013
        self.assertEqual(local_provider.get_login_policy, False)
1014

    
1015
        cl_olduser.logout()
1016
        login_data = {'username': 'olduser@synnefo.org',
1017
                      'password': 'password'}
1018
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
1019
        self.assertContains(r, "login/shibboleth'>Academic login")
1020
        Group.objects.all().delete()
1021

    
1022

    
1023
class TestAuthProvidersAPI(TestCase):
1024
    """
1025
    Test auth_providers module API
1026
    """
1027

    
1028
    def tearDown(self):
1029
        Group.objects.all().delete()
1030

    
1031
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1032
    def test_create(self):
1033
        user = auth_functions.make_user(email="kpap@synnefo.org")
1034
        user2 = auth_functions.make_user(email="kpap2@synnefo.org")
1035

    
1036
        module = 'shibboleth'
1037
        identifier = 'SHIB_UUID'
1038
        provider_params = {
1039
            'affiliation': 'UNIVERSITY',
1040
            'info': {'age': 27}
1041
        }
1042
        provider = auth.get_provider(module, user2, identifier,
1043
                                     **provider_params)
1044
        provider.add_to_user()
1045
        provider = auth.get_provider(module, user, identifier,
1046
                                     **provider_params)
1047
        provider.add_to_user()
1048
        user.email_verified = True
1049
        user.save()
1050
        self.assertRaises(Exception, provider.add_to_user)
1051
        provider = user.get_auth_provider(module, identifier)
1052
        self.assertEqual(user.get_auth_provider(
1053
            module, identifier)._instance.info.get('age'), 27)
1054

    
1055
        module = 'local'
1056
        identifier = None
1057
        provider_params = {'auth_backend': 'ldap', 'info':
1058
                          {'office': 'A1'}}
1059
        provider = auth.get_provider(module, user, identifier,
1060
                                     **provider_params)
1061
        provider.add_to_user()
1062
        self.assertFalse(provider.get_add_policy)
1063
        self.assertRaises(Exception, provider.add_to_user)
1064

    
1065
        shib = user.get_auth_provider('shibboleth',
1066
                                      'SHIB_UUID')
1067
        self.assertTrue(shib.get_remove_policy)
1068

    
1069
        local = user.get_auth_provider('local')
1070
        self.assertTrue(local.get_remove_policy)
1071

    
1072
        local.remove_from_user()
1073
        self.assertFalse(shib.get_remove_policy)
1074
        self.assertRaises(Exception, shib.remove_from_user)
1075

    
1076
        provider = user.get_auth_providers()[0]
1077
        self.assertRaises(Exception, provider.add_to_user)
1078

    
1079
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1080
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1081
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1082
                                                 'group2'])
1083
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1084
                        CREATION_GROUPS_POLICY=['localgroup-create',
1085
                                                'group-create'])
1086
    def test_add_groups(self):
1087
        user = auth_functions.make_user("kpap@synnefo.org")
1088
        provider = auth.get_provider('shibboleth', user, 'test123')
1089
        provider.add_to_user()
1090
        user = AstakosUser.objects.get()
1091
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1092
                         sorted([u'group1', u'group2', u'group-create']))
1093

    
1094
        local = auth.get_provider('local', user)
1095
        local.add_to_user()
1096
        provider = user.get_auth_provider('shibboleth')
1097
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1098
        provider.remove_from_user()
1099
        user = AstakosUser.objects.get()
1100
        self.assertEqual(len(user.get_auth_providers()), 1)
1101
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1102
                         sorted([u'group-create', u'localgroup']))
1103

    
1104
        local = user.get_auth_provider('local')
1105
        self.assertRaises(Exception, local.remove_from_user)
1106
        provider = auth.get_provider('shibboleth', user, 'test123')
1107
        provider.add_to_user()
1108
        user = AstakosUser.objects.get()
1109
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1110
                         sorted([u'group-create', u'group1', u'group2',
1111
                                 u'localgroup']))
1112
        Group.objects.all().delete()
1113

    
1114
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1115
    def test_policies(self):
1116
        group_old, created = Group.objects.get_or_create(name='olduser')
1117

    
1118
        astakos_settings.MODERATION_ENABLED = True
1119
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1120
            ['academic-user']
1121
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1122
            ['google-user']
1123

    
1124
        user = auth_functions.make_user("kpap@synnefo.org")
1125
        user.groups.add(group_old)
1126
        user.add_auth_provider('local')
1127

    
1128
        user2 = auth_functions.make_user("kpap2@synnefo.org")
1129
        user2.add_auth_provider('shibboleth', identifier='shibid')
1130

    
1131
        user3 = auth_functions.make_user("kpap3@synnefo.org")
1132
        user3.groups.add(group_old)
1133
        user3.add_auth_provider('local')
1134
        user3.add_auth_provider('shibboleth', identifier='1234')
1135

    
1136
        self.assertTrue(user2.groups.get(name='academic-user'))
1137
        self.assertFalse(user2.groups.filter(name='olduser').count())
1138

    
1139
        local = auth_providers.get_provider('local')
1140
        self.assertTrue(local.get_add_policy)
1141

    
1142
        academic_group = Group.objects.get(name='academic-user')
1143
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1144
                                                     academic_group,
1145
                                                     exclusive=True,
1146
                                                     add=False,
1147
                                                     login=False)
1148
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1149
                                                     academic_group,
1150
                                                     exclusive=True,
1151
                                                     login=False,
1152
                                                     add=False)
1153
        # no duplicate entry gets created
1154
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1155

    
1156
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1157
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1158
                                                     user2,
1159
                                                     remove=False)
1160
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1161

    
1162
        local = auth_providers.get_provider('local', user2)
1163
        google = auth_providers.get_provider('google', user2)
1164
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1165
        self.assertTrue(shibboleth.get_login_policy)
1166
        self.assertFalse(shibboleth.get_remove_policy)
1167
        self.assertFalse(local.get_add_policy)
1168
        self.assertFalse(local.get_add_policy)
1169
        self.assertFalse(google.get_add_policy)
1170

    
1171
        user2.groups.remove(Group.objects.get(name='academic-user'))
1172
        self.assertTrue(local.get_add_policy)
1173
        self.assertTrue(google.get_add_policy)
1174
        user2.groups.add(Group.objects.get(name='academic-user'))
1175

    
1176
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1177
                                                     user2,
1178
                                                     exclusive=True,
1179
                                                     add=True)
1180
        self.assertTrue(local.get_add_policy)
1181
        self.assertTrue(google.get_add_policy)
1182

    
1183
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1184
        self.assertFalse(local.get_automoderate_policy)
1185
        self.assertFalse(google.get_automoderate_policy)
1186
        self.assertTrue(shibboleth.get_automoderate_policy)
1187

    
1188
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1189
                  'GOOGLE_ADD_GROUPS_POLICY']:
1190
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1191

    
1192
    @shibboleth_settings(CREATE_POLICY=True)
1193
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1194
    def test_create_http(self):
1195
        # this should be wrapped inside a transaction
1196
        user = auth_functions.make_user(email="test@test.com")
1197
        provider = auth_providers.get_provider('shibboleth', user,
1198
                                               'test@academia.test')
1199
        provider.add_to_user()
1200
        user.get_auth_provider('shibboleth', 'test@academia.test')
1201
        provider = auth_providers.get_provider('local', user)
1202
        provider.add_to_user()
1203
        user.get_auth_provider('local')
1204

    
1205
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1206
        user = auth_functions.make_user("test2@test.com")
1207
        provider = auth_providers.get_provider('shibboleth', user,
1208
                                               'test@shibboleth.com',
1209
                                               **{'info': {'name':
1210
                                                           'User Test'}})
1211
        self.assertFalse(provider.get_create_policy)
1212
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1213
        self.assertTrue(provider.get_create_policy)
1214
        academic = provider.add_to_user()
1215

    
1216
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1217
    @shibboleth_settings(LIMIT_POLICY=2)
1218
    def test_policies(self):
1219
        user = get_local_user('kpap@synnefo.org')
1220
        user.add_auth_provider('shibboleth', identifier='1234')
1221
        user.add_auth_provider('shibboleth', identifier='12345')
1222

    
1223
        # default limit is 1
1224
        local = user.get_auth_provider('local')
1225
        self.assertEqual(local.get_add_policy, False)
1226

    
1227
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1228
        academic = user.get_auth_provider('shibboleth',
1229
                                          identifier='1234')
1230
        self.assertEqual(academic.get_add_policy, False)
1231
        newacademic = auth_providers.get_provider('shibboleth', user,
1232
                                                  identifier='123456')
1233
        self.assertEqual(newacademic.get_add_policy, True)
1234
        user.add_auth_provider('shibboleth', identifier='123456')
1235
        self.assertEqual(academic.get_add_policy, False)
1236
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1237

    
1238
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1239
    @shibboleth_settings(LIMIT_POLICY=2)
1240
    def test_messages(self):
1241
        user = get_local_user('kpap@synnefo.org')
1242
        user.add_auth_provider('shibboleth', identifier='1234')
1243
        user.add_auth_provider('shibboleth', identifier='12345')
1244
        provider = auth_providers.get_provider('shibboleth')
1245
        self.assertEqual(provider.get_message('title'), 'Academic')
1246
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1247
        # regenerate messages cache
1248
        provider = auth_providers.get_provider('shibboleth')
1249
        self.assertEqual(provider.get_message('title'), 'New title')
1250
        self.assertEqual(provider.get_message('login_title'),
1251
                         'New title LOGIN')
1252
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1253
        self.assertEqual(provider.get_module_icon,
1254
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1255
        self.assertEqual(provider.get_module_medium_icon,
1256
                         settings.MEDIA_URL +
1257
                         'im/auth/icons-medium/shibboleth.png')
1258

    
1259
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1260
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1261
        self.assertEqual(provider.get_method_details_msg,
1262
                         'Account: 12345')
1263
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1264
        self.assertEqual(provider.get_method_details_msg,
1265
                         'Account: 1234')
1266

    
1267
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1268
        self.assertEqual(provider.get_not_active_msg,
1269
                         "'Academic login' is disabled.")
1270

    
1271
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1272
    @shibboleth_settings(LIMIT_POLICY=2)
1273
    def test_templates(self):
1274
        user = get_local_user('kpap@synnefo.org')
1275
        user.add_auth_provider('shibboleth', identifier='1234')
1276
        user.add_auth_provider('shibboleth', identifier='12345')
1277

    
1278
        provider = auth_providers.get_provider('shibboleth')
1279
        self.assertEqual(provider.get_template('login'),
1280
                         'im/auth/shibboleth_login.html')
1281
        provider = auth_providers.get_provider('google')
1282
        self.assertEqual(provider.get_template('login'),
1283
                         'im/auth/generic_login.html')
1284

    
1285

    
1286
class TestActivationBackend(TestCase):
1287

    
1288
    def setUp(self):
1289
        # dummy call to pass through logging middleware
1290
        self.client.get(ui_url(''))
1291

    
1292
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1293
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1294
    def test_policies(self):
1295
        backend = activation_backends.get_backend()
1296

    
1297
        # email matches RE_USER_EMAIL_PATTERNS
1298
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1299
                               is_active=False, email_verified=False)
1300
        backend.handle_verification(user1, user1.verification_code)
1301
        self.assertEqual(user1.accepted_policy, 'email')
1302

    
1303
        # manually moderated
1304
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1305
                               is_active=False, email_verified=False)
1306

    
1307
        backend.handle_verification(user2, user2.verification_code)
1308
        self.assertEqual(user2.moderated, False)
1309
        backend.handle_moderation(user2)
1310
        self.assertEqual(user2.moderated, True)
1311
        self.assertEqual(user2.accepted_policy, 'manual')
1312

    
1313
        # autoaccept due to provider automoderate policy
1314
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1315
                               is_active=False, email_verified=False)
1316
        user3.auth_providers.all().delete()
1317
        user3.add_auth_provider('shibboleth', identifier='shib123')
1318
        backend.handle_verification(user3, user3.verification_code)
1319
        self.assertEqual(user3.moderated, True)
1320
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1321

    
1322
    @im_settings(MODERATION_ENABLED=False,
1323
                 MANAGERS=(('Manager',
1324
                            'manager@synnefo.org'),),
1325
                 HELPDESK=(('Helpdesk',
1326
                            'helpdesk@synnefo.org'),),
1327
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1328
    def test_without_moderation(self):
1329
        backend = activation_backends.get_backend()
1330
        form = backend.get_signup_form('local')
1331
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1332

    
1333
        user_data = {
1334
            'email': 'kpap@synnefo.org',
1335
            'first_name': 'Kostas Papas',
1336
            'password1': '123',
1337
            'password2': '123'
1338
        }
1339
        form = backend.get_signup_form('local', user_data)
1340
        user = form.create_user()
1341
        self.assertEqual(user.is_active, False)
1342
        self.assertEqual(user.email_verified, False)
1343

    
1344
        # step one, registration
1345
        result = backend.handle_registration(user)
1346
        user = AstakosUser.objects.get()
1347
        self.assertEqual(user.is_active, False)
1348
        self.assertEqual(user.email_verified, False)
1349
        self.assertTrue(user.verification_code)
1350
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1351
        backend.send_result_notifications(result, user)
1352
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1353
        self.assertEqual(len(mail.outbox), 1)
1354

    
1355
        # step two, verify email (automatically
1356
        # moderates/accepts user, since moderation is disabled)
1357
        user = AstakosUser.objects.get()
1358
        valid_code = user.verification_code
1359

    
1360
        # test invalid code
1361
        result = backend.handle_verification(user, valid_code)
1362
        backend.send_result_notifications(result, user)
1363
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1364
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1365
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1366
        # verification + activated + greeting = 3
1367
        self.assertEqual(len(mail.outbox), 3)
1368
        user = AstakosUser.objects.get()
1369
        self.assertEqual(user.is_active, True)
1370
        self.assertEqual(user.moderated, True)
1371
        self.assertTrue(user.moderated_at)
1372
        self.assertEqual(user.email_verified, True)
1373
        self.assertTrue(user.activation_sent)
1374

    
1375
    @im_settings(MODERATION_ENABLED=True,
1376
                 MANAGERS=(('Manager',
1377
                            'manager@synnefo.org'),),
1378
                 HELPDESK=(('Helpdesk',
1379
                            'helpdesk@synnefo.org'),),
1380
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1381
    def test_with_moderation(self):
1382

    
1383
        backend = activation_backends.get_backend()
1384
        form = backend.get_signup_form('local')
1385
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1386

    
1387
        user_data = {
1388
            'email': 'kpap@synnefo.org',
1389
            'first_name': 'Kostas Papas',
1390
            'password1': '123',
1391
            'password2': '123'
1392
        }
1393
        form = backend.get_signup_form(provider='local',
1394
                                       initial_data=user_data)
1395
        user = form.create_user()
1396
        self.assertEqual(user.is_active, False)
1397
        self.assertEqual(user.email_verified, False)
1398

    
1399
        # step one, registration
1400
        result = backend.handle_registration(user)
1401
        user = AstakosUser.objects.get()
1402
        self.assertEqual(user.is_active, False)
1403
        self.assertEqual(user.email_verified, False)
1404
        self.assertTrue(user.verification_code)
1405
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1406
        backend.send_result_notifications(result, user)
1407
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1408
        self.assertEqual(len(mail.outbox), 1)
1409

    
1410
        # step two, verifying email
1411
        user = AstakosUser.objects.get()
1412
        valid_code = user.verification_code
1413
        invalid_code = user.verification_code + 'invalid'
1414

    
1415
        # test invalid code
1416
        result = backend.handle_verification(user, invalid_code)
1417
        self.assertEqual(result.status, backend.Result.ERROR)
1418
        backend.send_result_notifications(result, user)
1419
        user = AstakosUser.objects.get()
1420
        self.assertEqual(user.is_active, False)
1421
        self.assertEqual(user.moderated, False)
1422
        self.assertEqual(user.moderated_at, None)
1423
        self.assertEqual(user.email_verified, False)
1424
        self.assertTrue(user.activation_sent)
1425

    
1426
        # test valid code
1427
        user = AstakosUser.objects.get()
1428
        result = backend.handle_verification(user, valid_code)
1429
        backend.send_result_notifications(result, user)
1430
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1431
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1432
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1433
        self.assertEqual(len(mail.outbox), 2)
1434
        user = AstakosUser.objects.get()
1435
        self.assertEqual(user.moderated, False)
1436
        self.assertEqual(user.moderated_at, None)
1437
        self.assertEqual(user.email_verified, True)
1438
        self.assertTrue(user.activation_sent)
1439

    
1440
        # test code reuse
1441
        result = backend.handle_verification(user, valid_code)
1442
        self.assertEqual(result.status, backend.Result.ERROR)
1443
        user = AstakosUser.objects.get()
1444
        self.assertEqual(user.is_active, False)
1445
        self.assertEqual(user.moderated, False)
1446
        self.assertEqual(user.moderated_at, None)
1447
        self.assertEqual(user.email_verified, True)
1448
        self.assertTrue(user.activation_sent)
1449

    
1450
        # valid code on verified user
1451
        user = AstakosUser.objects.get()
1452
        valid_code = user.verification_code
1453
        result = backend.handle_verification(user, valid_code)
1454
        self.assertEqual(result.status, backend.Result.ERROR)
1455

    
1456
        # step three, moderation user
1457
        user = AstakosUser.objects.get()
1458
        result = backend.handle_moderation(user)
1459
        backend.send_result_notifications(result, user)
1460

    
1461
        user = AstakosUser.objects.get()
1462
        self.assertEqual(user.is_active, True)
1463
        self.assertEqual(user.moderated, True)
1464
        self.assertTrue(user.moderated_at)
1465
        self.assertEqual(user.email_verified, True)
1466
        self.assertTrue(user.activation_sent)
1467

    
1468

    
1469
class TestWebloginRedirect(TestCase):
1470

    
1471
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1472
    def test_restricts_domains(self):
1473
        get_local_user('user1@synnefo.org')
1474

    
1475
        # next url construct helpers
1476
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1477
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1478
            urllib.quote_plus(nxt)
1479

    
1480
        # common cases
1481
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1482
        invalid_scheme = weblogin("customscheme://localhost")
1483
        invalid_scheme_with_valid_domain = \
1484
            weblogin("http://www.invaliddomain.com")
1485
        valid_scheme = weblogin("pithos://localhost/")
1486
        # to be used in assertRedirects
1487
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1488

    
1489
        # not authenticated, redirects to login which contains next param with
1490
        # additional nested quoted next params
1491
        r = self.client.get(valid_scheme, follow=True)
1492
        login_redirect = reverse('login') + '?next=' + \
1493
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1494
        self.assertRedirects(r, login_redirect)
1495

    
1496
        # authenticate client
1497
        self.client.login(username="user1@synnefo.org", password="password")
1498

    
1499
        # valid scheme
1500
        r = self.client.get(valid_scheme, follow=True)
1501
        url = r.redirect_chain[1][0]
1502
        # scheme preserved
1503
        self.assertTrue(url.startswith('pithos://localhost/'))
1504
        # redirect contains token param
1505
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1506
                                   scheme='https').query
1507
        params = urlparse.parse_qs(params)
1508
        self.assertEqual(params['token'][0],
1509
                         AstakosUser.objects.get().auth_token)
1510
        # does not contain uuid
1511
        # reverted for 0.14.2 to support old pithos desktop clients
1512
        #self.assertFalse('uuid' in params)
1513

    
1514
        # invalid cases
1515
        r = self.client.get(invalid_scheme, follow=True)
1516
        self.assertEqual(r.status_code, 403)
1517

    
1518
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1519
        self.assertEqual(r.status_code, 403)
1520

    
1521
        r = self.client.get(invalid_domain, follow=True)
1522
        self.assertEqual(r.status_code, 403)