Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 2c6bc262

History | View | Annotate | Download (67.8 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True,
81
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
82
        token = PendingThirdPartyUser.objects.get().token
83
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
84
        self.assertEqual(r.status_code, 200)
85

    
86
        # a new pending user created
87
        pending_user = PendingThirdPartyUser.objects.get(
88
            third_party_identifier="kpapeppn")
89
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
90
        # keep the token for future use
91
        token = pending_user.token
92
        # from now on no shibboleth headers are sent to the server
93
        client.reset_tokens()
94

    
95
        # this is the old way, it should fail, to avoid pending user take over
96
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
97
        self.assertEqual(r.status_code, 404)
98

    
99
        # this is the signup unique url associated with the pending user
100
        # created
101
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
102
        identifier = pending_user.third_party_identifier
103
        post_data = {'third_party_identifier': identifier,
104
                     'first_name': 'Kostas',
105
                     'third_party_token': token,
106
                     'last_name': 'Mitroglou',
107
                     'provider': 'shibboleth'}
108

    
109
        signup_url = reverse('signup')
110

    
111
        # invlid email
112
        post_data['email'] = 'kpap'
113
        r = client.post(signup_url, post_data)
114
        self.assertContains(r, token)
115

    
116
        # existing email
117
        existing_user = get_local_user('test@test.com')
118
        post_data['email'] = 'test@test.com'
119
        r = client.post(signup_url, post_data)
120
        self.assertContains(r, messages.EMAIL_USED)
121
        existing_user.delete()
122

    
123
        # and finally a valid signup
124
        post_data['email'] = 'kpap-takeover@synnefo.org'
125
        r = client.post(signup_url, post_data, follow=True)
126
        self.assertContains(r, messages.VERIFICATION_SENT)
127

    
128
        # takeover of the uverified the shibboleth identifier
129
        client = ShibbolethClient()
130
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
131
                          eppn="kpapeppn",
132
                          cn="Kostas Papadimitriou",
133
                          ep_affiliation="Test Affiliation")
134
        r = client.get(ui_url('login/shibboleth?'), follow=True,
135
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
136
        # a new pending user created, previous one was deleted
137
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
138
        pending_user = PendingThirdPartyUser.objects.get(
139
            third_party_identifier="kpapeppn")
140
        identifier = pending_user.third_party_identifier
141
        token = pending_user.token
142
        post_data = {'third_party_identifier': identifier,
143
                     'third_party_token': token}
144
        post_data['email'] = 'kpap@synnefo.org'
145
        r = client.post(signup_url, post_data)
146
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
147
        # previously unverified user associated with kpapeppn gets deleted
148
        user_qs = AstakosUser.objects.filter(email="kpap-takeover@synnefo.org")
149
        self.assertEqual(user_qs.count(), 0)
150

    
151
        # entires commited as expected
152
        self.assertEqual(AstakosUser.objects.count(), 1)
153
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
154

    
155
        user = AstakosUser.objects.get()
156
        provider = user.get_auth_provider("shibboleth")
157
        headers = provider.provider_details['info']['headers']
158
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
159

    
160
        # provider info stored
161
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
162
        self.assertEqual(provider.affiliation, 'Test Affiliation')
163
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
164
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
165
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
166
        self.assertTrue('headers' in provider.info)
167

    
168
        # login (not verified yet)
169
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
170
                          cn="Kostas Papadimitriou")
171
        r = client.get(ui_url("login/shibboleth?"), follow=True)
172
        self.assertContains(r, 'A pending registration exists for')
173
        self.assertNotContains(r, 'pending moderation')
174
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
175
        tmp_third_party = PendingThirdPartyUser.objects.get()
176

    
177
        # user gets verified
178
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
179
        backend = activation_backends.get_backend()
180
        activation_result = backend.verify_user(u, u.verification_code)
181
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
182
                          cn="Kostas Papadimitriou")
183
        r = client.get(ui_url("login/shibboleth?"), follow=True)
184
        self.assertNotContains(r, 'A pending registration exists for')
185
        self.assertContains(r, 'pending moderation')
186

    
187
        # temporary signup process continues. meanwhile the user have verified
188
        # her account. The signup process should fail
189
        tp = tmp_third_party
190
        post_data = {'third_party_identifier': tp.third_party_identifier,
191
                     'email': 'unsed-email@synnefo.org',
192
                     'third_party_token': tp.token}
193
        r = client.post(signup_url, post_data)
194
        self.assertEqual(r.status_code, 404)
195

    
196
        r = client.post(reverse('astakos.im.views.target.local.password_reset'),
197
                        {'email': 'kpap@synnefo.org'})
198
        self.assertContains(r, 'Classic login is not enabled for your account')
199

    
200
        # admin activates the user
201
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
202
        backend = activation_backends.get_backend()
203
        activation_result = backend.verify_user(u, u.verification_code)
204
        activation_result = backend.accept_user(u)
205
        self.assertFalse(activation_result.is_error())
206
        backend.send_result_notifications(activation_result, u)
207
        self.assertEqual(u.is_active, True)
208

    
209

    
210
        # we see our profile
211
        r = client.get(ui_url("login/shibboleth?"), follow=True)
212
        self.assertRedirects(r, ui_url('landing'))
213
        self.assertEqual(r.status_code, 200)
214

    
215
    def test_existing(self):
216
        """
217
        Test adding of third party login to an existing account
218
        """
219

    
220
        # this is our existing user
221
        existing_user = get_local_user('kpap@synnefo.org')
222
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
223
        existing_inactive.is_active = False
224
        existing_inactive.save()
225

    
226
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
227
        existing_unverified.is_active = False
228
        existing_unverified.activation_sent = None
229
        existing_unverified.email_verified = False
230
        existing_unverified.is_verified = False
231
        existing_unverified.save()
232

    
233
        client = ShibbolethClient()
234
        # shibboleth logged us in, notice that we use different email
235
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
236
                          cn="Kostas Papadimitriou", )
237
        r = client.get(ui_url("login/shibboleth?"), follow=True)
238

    
239
        # a new pending user created
240
        pending_user = PendingThirdPartyUser.objects.get()
241
        token = pending_user.token
242
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
243
        pending_key = pending_user.token
244
        client.reset_tokens()
245
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
246

    
247
        form = r.context['login_form']
248
        signupdata = copy.copy(form.initial)
249
        signupdata['email'] = 'kpap@synnefo.org'
250
        signupdata['third_party_token'] = token
251
        signupdata['provider'] = 'shibboleth'
252
        signupdata.pop('id', None)
253

    
254
        # the email exists to another user
255
        r = client.post(ui_url("signup"), signupdata)
256
        self.assertContains(r, "There is already an account with this email "
257
                               "address")
258
        # change the case, still cannot create
259
        signupdata['email'] = 'KPAP@synnefo.org'
260
        r = client.post(ui_url("signup"), signupdata)
261
        self.assertContains(r, "There is already an account with this email "
262
                               "address")
263
        # inactive user
264
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
265
        r = client.post(ui_url("signup"), signupdata)
266
        self.assertContains(r, "There is already an account with this email "
267
                               "address")
268

    
269
        # unverified user, this should pass, old entry will be deleted
270
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
271
        r = client.post(ui_url("signup"), signupdata)
272

    
273
        post_data = {'password': 'password',
274
                     'username': 'kpap@synnefo.org'}
275
        r = client.post(ui_url('local'), post_data, follow=True)
276
        self.assertTrue(r.context['request'].user.is_authenticated())
277
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
278
                          cn="Kostas Papadimitriou", )
279
        r = client.get(ui_url("login/shibboleth?"), follow=True)
280
        self.assertContains(r, "enabled for this account")
281
        client.reset_tokens()
282

    
283
        user = existing_user
284
        self.assertTrue(user.has_auth_provider('shibboleth'))
285
        self.assertTrue(user.has_auth_provider('local',
286
                                               auth_backend='astakos'))
287
        client.logout()
288

    
289
        # look Ma, i can login with both my shibboleth and local account
290
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
291
                          cn="Kostas Papadimitriou")
292
        r = client.get(ui_url("login/shibboleth?"), follow=True)
293
        self.assertTrue(r.context['request'].user.is_authenticated())
294
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
295
        self.assertRedirects(r, ui_url('landing'))
296
        self.assertEqual(r.status_code, 200)
297

    
298
        user = r.context['request'].user
299
        client.logout()
300
        client.reset_tokens()
301

    
302
        # logged out
303
        r = client.get(ui_url("profile"), follow=True)
304
        self.assertFalse(r.context['request'].user.is_authenticated())
305

    
306
        # login with local account also works
307
        post_data = {'password': 'password',
308
                     'username': 'kpap@synnefo.org'}
309
        r = self.client.post(ui_url('local'), post_data, follow=True)
310
        self.assertTrue(r.context['request'].user.is_authenticated())
311
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
312
        self.assertRedirects(r, ui_url('landing'))
313
        self.assertEqual(r.status_code, 200)
314

    
315
        # cannot add the same eppn
316
        client.set_tokens(mail="secondary@shibboleth.gr",
317
                          remote_user="kpapeppn",
318
                          cn="Kostas Papadimitriou", )
319
        r = client.get(ui_url("login/shibboleth?"), follow=True)
320
        self.assertRedirects(r, ui_url('landing'))
321
        self.assertTrue(r.status_code, 200)
322
        self.assertEquals(existing_user.auth_providers.count(), 2)
323

    
324
        # only one allowed by default
325
        client.set_tokens(mail="secondary@shibboleth.gr",
326
                          remote_user="kpapeppn2",
327
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
328
        prov = auth_providers.get_provider('shibboleth')
329
        r = client.get(ui_url("login/shibboleth?"), follow=True)
330
        self.assertContains(r, "Failed to add")
331
        self.assertRedirects(r, ui_url('profile'))
332
        self.assertTrue(r.status_code, 200)
333
        self.assertEquals(existing_user.auth_providers.count(), 2)
334
        client.logout()
335
        client.reset_tokens()
336

    
337
        # cannot login with another eppn
338
        client.set_tokens(mail="kpap@synnefo.org",
339
                          remote_user="kpapeppninvalid",
340
                          cn="Kostas Papadimitriou")
341
        r = client.get(ui_url("login/shibboleth?"), follow=True)
342
        self.assertFalse(r.context['request'].user.is_authenticated())
343

    
344
        # cannot
345

    
346
        # lets remove local password
347
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
348
                                       email="kpap@synnefo.org")
349
        remove_local_url = user.get_auth_provider('local').get_remove_url
350
        remove_shibbo_url = user.get_auth_provider('shibboleth',
351
                                                   'kpapeppn').get_remove_url
352
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
353
                          cn="Kostas Papadimtriou")
354
        r = client.get(ui_url("login/shibboleth?"), follow=True)
355
        client.reset_tokens()
356

    
357
        # only POST is allowed (for CSRF protection)
358
        r = client.get(remove_local_url, follow=True)
359
        self.assertEqual(r.status_code, 405)
360

    
361
        r = client.post(remove_local_url, follow=True)
362
        # 2 providers left
363
        self.assertEqual(user.auth_providers.count(), 1)
364
        # cannot remove last provider
365
        r = client.post(remove_shibbo_url)
366
        self.assertEqual(r.status_code, 403)
367
        self.client.logout()
368

    
369
        # cannot login using local credentials (notice we use another client)
370
        post_data = {'password': 'password',
371
                     'username': 'kpap@synnefo.org'}
372
        r = self.client.post(ui_url('local'), post_data, follow=True)
373
        self.assertFalse(r.context['request'].user.is_authenticated())
374

    
375
        # we can reenable the local provider by setting a password
376
        r = client.get(ui_url("password_change"), follow=True)
377
        r = client.post(ui_url("password_change"), {'new_password1': '111',
378
                                                    'new_password2': '111'},
379
                        follow=True)
380
        user = r.context['request'].user
381
        self.assertTrue(user.has_auth_provider('local'))
382
        self.assertTrue(user.has_auth_provider('shibboleth'))
383
        self.assertTrue(user.check_password('111'))
384
        self.assertTrue(user.has_usable_password())
385

    
386
        # change password via profile form
387
        r = client.post(ui_url("profile"), {
388
            'old_password': '111',
389
            'new_password': '',
390
            'new_password2': '',
391
            'change_password': 'on',
392
        }, follow=False)
393
        self.assertEqual(r.status_code, 200)
394
        self.assertFalse(r.context['profile_form'].is_valid())
395

    
396
        self.client.logout()
397

    
398
        # now we can login
399
        post_data = {'password': '111',
400
                     'username': 'kpap@synnefo.org'}
401
        r = self.client.post(ui_url('local'), post_data, follow=True)
402
        self.assertTrue(r.context['request'].user.is_authenticated())
403

    
404
        client.reset_tokens()
405

    
406
        # we cannot take over another shibboleth identifier
407
        user2 = get_local_user('another@synnefo.org')
408
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
409
        # login
410
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
411
                          cn="Kostas Papadimitriou")
412
        r = client.get(ui_url("login/shibboleth?"), follow=True)
413
        # try to assign existing shibboleth identifier of another user
414
        client.set_tokens(mail="kpap_second@shibboleth.gr",
415
                          remote_user="existingeppn",
416
                          cn="Kostas Papadimitriou")
417
        r = client.get(ui_url("login/shibboleth?"), follow=True)
418
        self.assertContains(r, "is already in use")
419

    
420

    
421
class TestLocal(TestCase):
422

    
423
    def setUp(self):
424
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
425
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
426
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
427
        settings.ASTAKOS_MODERATION_ENABLED = True
428

    
429
    def tearDown(self):
430
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
431
        AstakosUser.objects.all().delete()
432

    
433
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
434
    def test_login_ratelimit(self):
435
        from django.core.cache import cache
436
        cache.clear()
437
        [cache.delete(key) for key in cache._cache.keys()]
438

    
439
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
440
        r = self.client.post(ui_url('local'), credentials, follow=True)
441
        fields = r.context['login_form'].fields.keyOrder
442
        self.assertFalse('recaptcha_challenge_field' in fields)
443
        r = self.client.post(ui_url('local'), credentials, follow=True)
444
        fields = r.context['login_form'].fields.keyOrder
445
        self.assertFalse('recaptcha_challenge_field' in fields)
446
        r = self.client.post(ui_url('local'), credentials, follow=True)
447
        fields = r.context['login_form'].fields.keyOrder
448
        self.assertTrue('recaptcha_challenge_field' in fields)
449

    
450
    def test_no_moderation(self):
451
        # disable moderation
452
        astakos_settings.MODERATION_ENABLED = False
453

    
454
        # create a new user
455
        r = self.client.get(ui_url("signup"))
456
        self.assertEqual(r.status_code, 200)
457
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
458
                'password2': 'password', 'first_name': 'Kostas',
459
                'last_name': 'Mitroglou', 'provider': 'local'}
460
        r = self.client.post(ui_url("signup"), data)
461

    
462
        # user created
463
        self.assertEqual(AstakosUser.objects.count(), 1)
464
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
465
                                       email="kpap@synnefo.org")
466
        self.assertEqual(user.username, 'kpap@synnefo.org')
467
        self.assertEqual(user.has_auth_provider('local'), True)
468
        self.assertFalse(user.is_active)
469

    
470
        # user (but not admin) gets notified
471
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
472
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
473
        astakos_settings.MODERATION_ENABLED = True
474

    
475
    def test_email_case(self):
476
        data = {
477
            'email': 'kPap@synnefo.org',
478
            'password1': '1234',
479
            'password2': '1234'
480
        }
481

    
482
        form = forms.LocalUserCreationForm(data)
483
        self.assertTrue(form.is_valid())
484
        user = form.create_user()
485

    
486
        u = AstakosUser.objects.get()
487
        self.assertEqual(u.email, 'kPap@synnefo.org')
488
        self.assertEqual(u.username, 'kpap@synnefo.org')
489
        u.is_active = True
490
        u.email_verified = True
491
        u.save()
492

    
493
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
494
        login = forms.LoginForm(data=data)
495
        self.assertTrue(login.is_valid())
496

    
497
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
498
        login = forms.LoginForm(data=data)
499
        self.assertTrue(login.is_valid())
500

    
501
        data = {
502
            'email': 'kpap@synnefo.org',
503
            'password1': '1234',
504
            'password2': '1234'
505
        }
506
        form = forms.LocalUserCreationForm(data)
507
        self.assertFalse(form.is_valid())
508

    
509
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
510
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
511
    def test_local_provider(self):
512
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
513

    
514
        # create a user
515
        r = self.client.get(ui_url("signup"))
516
        self.assertEqual(r.status_code, 200)
517
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
518
                'password2': 'password', 'first_name': 'Kostas',
519
                'last_name': 'Mitroglou', 'provider': 'local'}
520
        r = self.client.post(ui_url("signup"), data)
521

    
522
        # user created
523
        self.assertEqual(AstakosUser.objects.count(), 1)
524
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
525
                                       email="kpap@synnefo.org")
526
        self.assertEqual(user.username, 'kpap@synnefo.org')
527
        self.assertEqual(user.has_auth_provider('local'), True)
528
        self.assertFalse(user.is_active)  # not activated
529
        self.assertFalse(user.email_verified)  # not verified
530
        self.assertTrue(user.activation_sent)  # activation automatically sent
531
        self.assertFalse(user.moderated)
532
        self.assertFalse(user.email_verified)
533

    
534
        # admin gets notified and activates the user from the command line
535
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
536
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
537
                                               'password': 'password'},
538
                             follow=True)
539
        self.assertContains(r, messages.VERIFICATION_SENT)
540
        backend = activation_backends.get_backend()
541

    
542
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
543
        backend.send_user_verification_email(user)
544

    
545
        # user activation fields updated and user gets notified via email
546
        user = AstakosUser.objects.get(pk=user.pk)
547
        self.assertTrue(user.activation_sent)
548
        self.assertFalse(user.email_verified)
549
        self.assertFalse(user.is_active)
550
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
551

    
552
        # user forgot she got registered and tries to submit registration
553
        # form. Notice the upper case in email
554
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
555
                'password2': 'password', 'first_name': 'Kostas',
556
                'last_name': 'Mitroglou', 'provider': 'local'}
557
        r = self.client.post(ui_url("signup"), data, follow=True)
558
        self.assertRedirects(r, reverse('login'))
559
        self.assertContains(r, messages.VERIFICATION_SENT)
560

    
561
        user = AstakosUser.objects.get()
562
        # previous user replaced
563
        self.assertTrue(user.activation_sent)
564
        self.assertFalse(user.email_verified)
565
        self.assertFalse(user.is_active)
566
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
567

    
568
        # hmmm, email exists; lets request a password change
569
        r = self.client.get(ui_url('local/password_reset'))
570
        self.assertEqual(r.status_code, 200)
571
        data = {'email': 'kpap@synnefo.org'}
572
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
573
        # she can't because account is not active yet
574
        self.assertContains(r, 'pending activation')
575

    
576
        # moderation is enabled and an activation email has already been sent
577
        # so user can trigger resend of the activation email
578
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
579
                            follow=True)
580
        self.assertContains(r, 'has been sent to your email address.')
581
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
582

    
583
        # also she cannot login
584
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
585
        r = self.client.post(ui_url('local'), data, follow=True)
586
        self.assertContains(r, 'Resend activation')
587
        self.assertFalse(r.context['request'].user.is_authenticated())
588
        self.assertFalse('_pithos2_a' in self.client.cookies)
589

    
590
        # user sees the message and resends activation
591
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
592
                            follow=True)
593
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
594

    
595
        # logged in user cannot activate another account
596
        tmp_user = get_local_user("test_existing_user@synnefo.org")
597
        tmp_client = Client()
598
        tmp_client.login(username="test_existing_user@synnefo.org",
599
                         password="password")
600
        r = tmp_client.get(user.get_activation_url(), follow=True)
601
        self.assertContains(r, messages.LOGGED_IN_WARNING)
602

    
603
        # empty activation code is not allowed
604
        r = self.client.get(user.get_activation_url().split("?")[0],
605
                            follow=True)
606
        self.assertEqual(r.status_code, 403)
607

    
608
        r = self.client.get(user.get_activation_url(), follow=True)
609
        # previous code got invalidated
610
        self.assertRedirects(r, reverse('login'))
611
        self.assertContains(r, astakos_messages.INVALID_ACTIVATION_KEY)
612
        self.assertEqual(r.status_code, 200)
613

    
614
        user = AstakosUser.objects.get(pk=user.pk)
615
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
616
        r = self.client.get(user.get_activation_url(), follow=True)
617
        self.assertRedirects(r, reverse('login'))
618
        # user sees that account is pending approval from admins
619
        self.assertContains(r, messages.NOTIFICATION_SENT)
620
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
621

    
622
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
623
        result = backend.handle_moderation(user)
624
        backend.send_result_notifications(result, user)
625
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
626
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
627

    
628
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
629
        r = self.client.get(ui_url('profile'), follow=True)
630
        self.assertFalse(r.context['request'].user.is_authenticated())
631
        self.assertFalse('_pithos2_a' in self.client.cookies)
632
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
633

    
634
        user = AstakosUser.objects.get(pk=user.pk)
635
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
636
                                               'password': 'password'},
637
                             follow=True)
638
        # user activated and logged in, token cookie set
639
        self.assertTrue(r.context['request'].user.is_authenticated())
640
        self.assertTrue('_pithos2_a' in self.client.cookies)
641
        cookies = self.client.cookies
642
        self.assertTrue(quote(user.auth_token) in
643
                        cookies.get('_pithos2_a').value)
644
        r = self.client.get(ui_url('logout'), follow=True)
645
        r = self.client.get(ui_url(''), follow=True)
646
        self.assertRedirects(r, ui_url('login'))
647
        # user logged out, token cookie removed
648
        self.assertFalse(r.context['request'].user.is_authenticated())
649
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
650

    
651
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
652
        del self.client.cookies['_pithos2_a']
653

    
654
        # user can login
655
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
656
                                               'password': 'password'},
657
                             follow=True)
658
        self.assertTrue(r.context['request'].user.is_authenticated())
659
        self.assertTrue('_pithos2_a' in self.client.cookies)
660
        cookies = self.client.cookies
661
        self.assertTrue(quote(user.auth_token) in
662
                        cookies.get('_pithos2_a').value)
663
        self.client.get(ui_url('logout'), follow=True)
664

    
665
        # user forgot password
666
        old_pass = user.password
667
        r = self.client.get(ui_url('local/password_reset'))
668
        self.assertEqual(r.status_code, 200)
669
        r = self.client.post(ui_url('local/password_reset'),
670
                             {'email': 'kpap@synnefo.org'})
671
        self.assertEqual(r.status_code, 302)
672
        # email sent
673
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
674

    
675
        # user visits change password link
676
        user = AstakosUser.objects.get(pk=user.pk)
677
        r = self.client.get(user.get_password_reset_url())
678
        r = self.client.post(user.get_password_reset_url(),
679
                             {'new_password1': 'newpass',
680
                              'new_password2': 'newpass'})
681

    
682
        user = AstakosUser.objects.get(pk=user.pk)
683
        self.assertNotEqual(old_pass, user.password)
684

    
685
        # old pass is not usable
686
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
687
                                               'password': 'password'})
688
        self.assertContains(r, 'Please enter a correct username and password')
689
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
690
                                               'password': 'newpass'},
691
                             follow=True)
692
        self.assertTrue(r.context['request'].user.is_authenticated())
693
        self.client.logout()
694

    
695
        # tests of special local backends
696
        user = AstakosUser.objects.get(pk=user.pk)
697
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
698
        user.save()
699

    
700
        # non astakos local backends do not support password reset
701
        r = self.client.get(ui_url('local/password_reset'))
702
        self.assertEqual(r.status_code, 200)
703
        r = self.client.post(ui_url('local/password_reset'),
704
                             {'email': 'kpap@synnefo.org'})
705
        # she can't because account is not active yet
706
        self.assertContains(r, "Changing password is not")
707

    
708
    def test_fix_superuser(self):
709
        u = User.objects.create(username="dummy", email="email@example.org",
710
                                first_name="Super", last_name="User",
711
                                is_superuser=True)
712
        User.objects.create(username="dummy2", email="email2@example.org",
713
                            first_name="Other", last_name="User")
714
        fixed = auth_functions.fix_superusers()
715
        self.assertEqual(len(fixed), 1)
716
        fuser = fixed[0]
717
        self.assertEqual(fuser.email, fuser.username)
718

    
719

    
720
class UserActionsTests(TestCase):
721

    
722
    def test_email_validation(self):
723
        backend = activation_backends.get_backend()
724
        form = backend.get_signup_form('local')
725
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
726
        user_data = {
727
            'email': 'kpap@synnefo.org',
728
            'first_name': 'Kostas Papas',
729
            'password1': '123',
730
            'password2': '123'
731
        }
732
        form = backend.get_signup_form(provider='local',
733
                                       initial_data=user_data)
734
        self.assertTrue(form.is_valid())
735
        user_data['email'] = 'kpap@synnefo.org.'
736
        form = backend.get_signup_form(provider='local',
737
                                       initial_data=user_data)
738
        self.assertFalse(form.is_valid())
739

    
740
    def test_email_change(self):
741
        # to test existing email validation
742
        get_local_user('existing@synnefo.org')
743

    
744
        # local user
745
        user = get_local_user('kpap@synnefo.org')
746

    
747
        # login as kpap
748
        self.client.login(username='kpap@synnefo.org', password='password')
749
        r = self.client.get(ui_url('profile'), follow=True)
750
        user = r.context['request'].user
751
        self.assertTrue(user.is_authenticated())
752

    
753
        # change email is enabled
754
        r = self.client.get(ui_url('email_change'))
755
        self.assertEqual(r.status_code, 200)
756
        self.assertFalse(user.email_change_is_pending())
757

    
758
        # invalid email format
759
        data = {'new_email_address': 'existing@synnefo.org.'}
760
        r = self.client.post(ui_url('email_change'), data)
761
        form = r.context['form']
762
        self.assertFalse(form.is_valid())
763

    
764
        # request email change to an existing email fails
765
        data = {'new_email_address': 'existing@synnefo.org'}
766
        r = self.client.post(ui_url('email_change'), data)
767

    
768
        self.assertContains(r, messages.EMAIL_USED)
769

    
770
        # proper email change
771
        data = {'new_email_address': 'kpap@gmail.com'}
772
        r = self.client.post(ui_url('email_change'), data, follow=True)
773
        self.assertRedirects(r, ui_url('profile'))
774
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
775
        change1 = EmailChange.objects.get()
776

    
777
        # user sees a warning
778
        r = self.client.get(ui_url('email_change'))
779
        self.assertEqual(r.status_code, 200)
780
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
781
        self.assertTrue(user.email_change_is_pending())
782

    
783
        # link was sent
784
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
785
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
786

    
787
        # proper email change
788
        data = {'new_email_address': 'kpap@yahoo.com'}
789
        r = self.client.post(ui_url('email_change'), data, follow=True)
790
        self.assertRedirects(r, ui_url('profile'))
791
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
792
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
793
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
794
        change2 = EmailChange.objects.get()
795

    
796
        r = self.client.get(change1.get_url())
797
        self.assertEquals(r.status_code, 404)
798
        self.client.logout()
799

    
800
        invalid_client = Client()
801
        r = invalid_client.post(ui_url('local?'),
802
                                {'username': 'existing@synnefo.org',
803
                                 'password': 'password'})
804
        r = invalid_client.get(change2.get_url(), follow=True)
805
        self.assertEquals(r.status_code, 403)
806

    
807
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
808
                             {'username': 'kpap@synnefo.org',
809
                              'password': 'password',
810
                              'next': change2.get_url()},
811
                             follow=True)
812
        self.assertRedirects(r, ui_url('profile'))
813
        user = r.context['request'].user
814
        self.assertEquals(user.email, 'kpap@yahoo.com')
815
        self.assertEquals(user.username, 'kpap@yahoo.com')
816

    
817
        self.client.logout()
818
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
819
                             {'username': 'kpap@synnefo.org',
820
                              'password': 'password',
821
                              'next': change2.get_url()},
822
                             follow=True)
823
        self.assertContains(r, "Please enter a correct username and password")
824
        self.assertEqual(user.emailchanges.count(), 0)
825

    
826
        AstakosUser.objects.all().delete()
827
        Group.objects.all().delete()
828

    
829

    
830
TEST_TARGETED_ID1 = \
831
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
832
TEST_TARGETED_ID2 = \
833
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
834
TEST_TARGETED_ID3 = \
835
    "https://idp.synnefo.org/idp/shibboleth!"
836

    
837

    
838
class TestAuthProviderViews(TestCase):
839

    
840
    def tearDown(self):
841
        AstakosUser.objects.all().delete()
842

    
843
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
844
                 SHIBBOLETH_MIGRATE_EPPN=True)
845
    def migrate_to_remote_id(self):
846
        eppn_user = get_local_user("eppn@synnefo.org")
847
        tid_user = get_local_user("tid@synnefo.org")
848
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
849
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
850

    
851
        get_user = lambda r: r.context['request'].user
852

    
853
        client = ShibbolethClient()
854
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
855
        r = client.get(ui_url('login/shibboleth?'), follow=True)
856
        self.assertTrue(get_user(r).is_authenticated())
857
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
858
                         TEST_TARGETED_ID2)
859

    
860
        client = ShibbolethClient()
861
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
862
        r = client.get(ui_url('login/shibboleth?'), follow=True)
863
        self.assertTrue(get_user(r).is_authenticated())
864
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
865
                         TEST_TARGETED_ID1)
866

    
867
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
868
                         AUTOMODERATE_POLICY=True)
869
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
870
                 HELPDESK=(('support', 'support@synnefo.org'),),
871
                 FORCE_PROFILE_UPDATE=False)
872
    def test_user(self):
873
        Profile = AuthProviderPolicyProfile
874
        Pending = PendingThirdPartyUser
875
        User = AstakosUser
876

    
877
        auth_functions.make_user("newuser@synnefo.org")
878
        get_local_user("olduser@synnefo.org")
879
        cl_olduser = ShibbolethClient()
880
        get_local_user("olduser2@synnefo.org")
881
        ShibbolethClient()
882
        cl_newuser = ShibbolethClient()
883
        cl_newuser2 = Client()
884

    
885
        academic_group, created = Group.objects.get_or_create(
886
            name='academic-login')
887
        academic_users = academic_group.user_set
888
        assert created
889
        policy_only_academic = Profile.objects.add_policy('academic_strict',
890
                                                          'shibboleth',
891
                                                          academic_group,
892
                                                          exclusive=True,
893
                                                          login=False,
894
                                                          add=False)
895

    
896
        # new academic user
897
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
898
        cl_newuser.set_tokens(remote_user="newusereppn",
899
                              mail="newuser@synnefo.org", surname="Lastname")
900
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
901
        initial = r.context['signup_form'].initial
902
        pending = Pending.objects.get()
903
        self.assertEqual(initial.get('last_name'), 'Lastname')
904
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
905
        identifier = pending.third_party_identifier
906
        signup_data = {'third_party_identifier': identifier,
907
                       'first_name': 'Academic',
908
                       'third_party_token': pending.token,
909
                       'last_name': 'New User',
910
                       'provider': 'shibboleth'}
911
        r = cl_newuser.post(ui_url('signup'), signup_data)
912
        self.assertContains(r, "This field is required", )
913
        signup_data['email'] = 'olduser@synnefo.org'
914
        r = cl_newuser.post(ui_url('signup'), signup_data)
915
        self.assertContains(r, "already an account with this email", )
916
        signup_data['email'] = 'newuser@synnefo.org'
917
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
918
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
919
        self.assertEqual(r.status_code, 404)
920
        newuser = User.objects.get(email="newuser@synnefo.org")
921
        activation_link = newuser.get_activation_url()
922
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
923

    
924
        # new non-academic user
925
        signup_data = {'first_name': 'Non Academic',
926
                       'last_name': 'New User',
927
                       'provider': 'local',
928
                       'password1': 'password',
929
                       'password2': 'password'}
930
        signup_data['email'] = 'olduser@synnefo.org'
931
        r = cl_newuser2.post(ui_url('signup'), signup_data)
932
        self.assertContains(r, 'There is already an account with this '
933
                               'email address')
934
        signup_data['email'] = 'newuser@synnefo.org'
935
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
936
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
937
        r = self.client.get(activation_link, follow=True)
938
        self.assertEqual(r.status_code, 200)
939
        self.assertContains(r, astakos_messages.INVALID_ACTIVATION_KEY)
940
        newuser = User.objects.get(email="newuser@synnefo.org")
941
        self.assertTrue(newuser.activation_sent)
942

    
943
        # activation sent, user didn't open verification url so additional
944
        # registrations invalidate the previous signups.
945
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
946
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
947
        pending = Pending.objects.get()
948
        identifier = pending.third_party_identifier
949
        signup_data = {'third_party_identifier': identifier,
950
                       u'first_name': 'Academic γιούνικοουντ',
951
                       'third_party_token': pending.token,
952
                       'last_name': 'New User',
953
                       'provider': 'shibboleth'}
954
        signup_data['email'] = 'newuser@synnefo.org'
955
        r = cl_newuser.post(ui_url('signup'), signup_data)
956
        self.assertEqual(r.status_code, 302)
957
        newuser = User.objects.get(email="newuser@synnefo.org")
958
        self.assertTrue(newuser.activation_sent)
959
        activation_link = newuser.get_activation_url()
960
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
961
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
962
        self.assertRedirects(r, ui_url('landing'))
963
        helpdesk_email = astakos_settings.HELPDESK[0][1]
964
        self.assertEqual(len(get_mailbox(helpdesk_email)), 1)
965
        self.assertTrue(u'AstakosUser: Academic γιούνικοουντ' in \
966
                            get_mailbox(helpdesk_email)[0].body)
967
        newuser = User.objects.get(email="newuser@synnefo.org")
968
        self.assertEqual(newuser.is_active, True)
969
        self.assertEqual(newuser.email_verified, True)
970
        cl_newuser.logout()
971

    
972
        # cannot reactivate if suspended
973
        newuser.is_active = False
974
        newuser.save()
975
        r = cl_newuser.get(newuser.get_activation_url())
976
        newuser = User.objects.get(email="newuser@synnefo.org")
977
        self.assertFalse(newuser.is_active)
978

    
979
        # release suspension
980
        newuser.is_active = True
981
        newuser.save()
982

    
983
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
984
        local = auth.get_provider('local', newuser)
985
        self.assertEqual(local.get_add_policy, False)
986
        self.assertEqual(local.get_login_policy, False)
987
        r = cl_newuser.get(local.get_add_url, follow=True)
988
        self.assertRedirects(r, ui_url('profile'))
989
        self.assertContains(r, 'disabled for your')
990

    
991
        cl_olduser.login(username='olduser@synnefo.org', password="password")
992
        r = cl_olduser.get(ui_url('profile'), follow=True)
993
        self.assertEqual(r.status_code, 200)
994
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
995
        self.assertContains(r, 'Your request is missing a unique token')
996
        cl_olduser.set_tokens(remote_user="newusereppn")
997
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
998
        self.assertContains(r, 'already in use')
999
        cl_olduser.set_tokens(remote_user="oldusereppn")
1000
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
1001
        self.assertContains(r, 'Academic login enabled for this account')
1002

    
1003
        user = User.objects.get(email="olduser@synnefo.org")
1004
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1005
        local_provider = user.get_auth_provider('local')
1006
        self.assertEqual(shib_provider.get_remove_policy, True)
1007
        self.assertEqual(local_provider.get_remove_policy, True)
1008

    
1009
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
1010
                                                          'shibboleth',
1011
                                                          academic_group,
1012
                                                          remove=False)
1013
        user.groups.add(academic_group)
1014
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
1015
        local_provider = user.get_auth_provider('local')
1016
        self.assertEqual(shib_provider.get_remove_policy, False)
1017
        self.assertEqual(local_provider.get_remove_policy, True)
1018
        self.assertEqual(local_provider.get_login_policy, False)
1019

    
1020
        cl_olduser.logout()
1021
        login_data = {'username': 'olduser@synnefo.org',
1022
                      'password': 'password'}
1023
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
1024
        self.assertContains(r, "login/shibboleth'>Academic login")
1025
        Group.objects.all().delete()
1026

    
1027

    
1028
class TestAuthProvidersAPI(TestCase):
1029
    """
1030
    Test auth_providers module API
1031
    """
1032

    
1033
    def tearDown(self):
1034
        Group.objects.all().delete()
1035

    
1036
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1037
    def test_create(self):
1038
        user = auth_functions.make_user(email="kpap@synnefo.org")
1039
        user2 = auth_functions.make_user(email="kpap2@synnefo.org")
1040

    
1041
        module = 'shibboleth'
1042
        identifier = 'SHIB_UUID'
1043
        provider_params = {
1044
            'affiliation': 'UNIVERSITY',
1045
            'info': {'age': 27}
1046
        }
1047
        provider = auth.get_provider(module, user2, identifier,
1048
                                     **provider_params)
1049
        provider.add_to_user()
1050
        provider = auth.get_provider(module, user, identifier,
1051
                                     **provider_params)
1052
        provider.add_to_user()
1053
        user.email_verified = True
1054
        user.save()
1055
        self.assertRaises(Exception, provider.add_to_user)
1056
        provider = user.get_auth_provider(module, identifier)
1057
        self.assertEqual(user.get_auth_provider(
1058
            module, identifier)._instance.info.get('age'), 27)
1059

    
1060
        module = 'local'
1061
        identifier = None
1062
        provider_params = {'auth_backend': 'ldap', 'info':
1063
                          {'office': 'A1'}}
1064
        provider = auth.get_provider(module, user, identifier,
1065
                                     **provider_params)
1066
        provider.add_to_user()
1067
        self.assertFalse(provider.get_add_policy)
1068
        self.assertRaises(Exception, provider.add_to_user)
1069

    
1070
        shib = user.get_auth_provider('shibboleth',
1071
                                      'SHIB_UUID')
1072
        self.assertTrue(shib.get_remove_policy)
1073

    
1074
        local = user.get_auth_provider('local')
1075
        self.assertTrue(local.get_remove_policy)
1076

    
1077
        local.remove_from_user()
1078
        self.assertFalse(shib.get_remove_policy)
1079
        self.assertRaises(Exception, shib.remove_from_user)
1080

    
1081
        provider = user.get_auth_providers()[0]
1082
        self.assertRaises(Exception, provider.add_to_user)
1083

    
1084
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1085
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1086
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1087
                                                 'group2'])
1088
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1089
                        CREATION_GROUPS_POLICY=['localgroup-create',
1090
                                                'group-create'])
1091
    def test_add_groups(self):
1092
        user = auth_functions.make_user("kpap@synnefo.org")
1093
        provider = auth.get_provider('shibboleth', user, 'test123')
1094
        provider.add_to_user()
1095
        user = AstakosUser.objects.get()
1096
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1097
                         sorted([u'group1', u'group2', u'group-create']))
1098

    
1099
        local = auth.get_provider('local', user)
1100
        local.add_to_user()
1101
        provider = user.get_auth_provider('shibboleth')
1102
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1103
        provider.remove_from_user()
1104
        user = AstakosUser.objects.get()
1105
        self.assertEqual(len(user.get_auth_providers()), 1)
1106
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1107
                         sorted([u'group-create', u'localgroup']))
1108

    
1109
        local = user.get_auth_provider('local')
1110
        self.assertRaises(Exception, local.remove_from_user)
1111
        provider = auth.get_provider('shibboleth', user, 'test123')
1112
        provider.add_to_user()
1113
        user = AstakosUser.objects.get()
1114
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1115
                         sorted([u'group-create', u'group1', u'group2',
1116
                                 u'localgroup']))
1117
        Group.objects.all().delete()
1118

    
1119
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1120
    def test_policies(self):
1121
        group_old, created = Group.objects.get_or_create(name='olduser')
1122

    
1123
        astakos_settings.MODERATION_ENABLED = True
1124
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1125
            ['academic-user']
1126
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1127
            ['google-user']
1128

    
1129
        user = auth_functions.make_user("kpap@synnefo.org")
1130
        user.groups.add(group_old)
1131
        user.add_auth_provider('local')
1132

    
1133
        user2 = auth_functions.make_user("kpap2@synnefo.org")
1134
        user2.add_auth_provider('shibboleth', identifier='shibid')
1135

    
1136
        user3 = auth_functions.make_user("kpap3@synnefo.org")
1137
        user3.groups.add(group_old)
1138
        user3.add_auth_provider('local')
1139
        user3.add_auth_provider('shibboleth', identifier='1234')
1140

    
1141
        self.assertTrue(user2.groups.get(name='academic-user'))
1142
        self.assertFalse(user2.groups.filter(name='olduser').count())
1143

    
1144
        local = auth_providers.get_provider('local')
1145
        self.assertTrue(local.get_add_policy)
1146

    
1147
        academic_group = Group.objects.get(name='academic-user')
1148
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1149
                                                     academic_group,
1150
                                                     exclusive=True,
1151
                                                     add=False,
1152
                                                     login=False)
1153
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1154
                                                     academic_group,
1155
                                                     exclusive=True,
1156
                                                     login=False,
1157
                                                     add=False)
1158
        # no duplicate entry gets created
1159
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1160

    
1161
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1162
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1163
                                                     user2,
1164
                                                     remove=False)
1165
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1166

    
1167
        local = auth_providers.get_provider('local', user2)
1168
        google = auth_providers.get_provider('google', user2)
1169
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1170
        self.assertTrue(shibboleth.get_login_policy)
1171
        self.assertFalse(shibboleth.get_remove_policy)
1172
        self.assertFalse(local.get_add_policy)
1173
        self.assertFalse(local.get_add_policy)
1174
        self.assertFalse(google.get_add_policy)
1175

    
1176
        user2.groups.remove(Group.objects.get(name='academic-user'))
1177
        self.assertTrue(local.get_add_policy)
1178
        self.assertTrue(google.get_add_policy)
1179
        user2.groups.add(Group.objects.get(name='academic-user'))
1180

    
1181
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1182
                                                     user2,
1183
                                                     exclusive=True,
1184
                                                     add=True)
1185
        self.assertTrue(local.get_add_policy)
1186
        self.assertTrue(google.get_add_policy)
1187

    
1188
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1189
        self.assertFalse(local.get_automoderate_policy)
1190
        self.assertFalse(google.get_automoderate_policy)
1191
        self.assertTrue(shibboleth.get_automoderate_policy)
1192

    
1193
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1194
                  'GOOGLE_ADD_GROUPS_POLICY']:
1195
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1196

    
1197
    @shibboleth_settings(CREATE_POLICY=True)
1198
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1199
    def test_create_http(self):
1200
        # this should be wrapped inside a transaction
1201
        user = auth_functions.make_user(email="test@test.com")
1202
        provider = auth_providers.get_provider('shibboleth', user,
1203
                                               'test@academia.test')
1204
        provider.add_to_user()
1205
        user.get_auth_provider('shibboleth', 'test@academia.test')
1206
        provider = auth_providers.get_provider('local', user)
1207
        provider.add_to_user()
1208
        user.get_auth_provider('local')
1209

    
1210
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1211
        user = auth_functions.make_user("test2@test.com")
1212
        provider = auth_providers.get_provider('shibboleth', user,
1213
                                               'test@shibboleth.com',
1214
                                               **{'info': {'name':
1215
                                                           'User Test'}})
1216
        self.assertFalse(provider.get_create_policy)
1217
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1218
        self.assertTrue(provider.get_create_policy)
1219
        academic = provider.add_to_user()
1220

    
1221
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1222
    @shibboleth_settings(LIMIT_POLICY=2)
1223
    def test_policies(self):
1224
        user = get_local_user('kpap@synnefo.org')
1225
        user.add_auth_provider('shibboleth', identifier='1234')
1226
        user.add_auth_provider('shibboleth', identifier='12345')
1227

    
1228
        # default limit is 1
1229
        local = user.get_auth_provider('local')
1230
        self.assertEqual(local.get_add_policy, False)
1231

    
1232
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1233
        academic = user.get_auth_provider('shibboleth',
1234
                                          identifier='1234')
1235
        self.assertEqual(academic.get_add_policy, False)
1236
        newacademic = auth_providers.get_provider('shibboleth', user,
1237
                                                  identifier='123456')
1238
        self.assertEqual(newacademic.get_add_policy, True)
1239
        user.add_auth_provider('shibboleth', identifier='123456')
1240
        self.assertEqual(academic.get_add_policy, False)
1241
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1242

    
1243
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1244
    @shibboleth_settings(LIMIT_POLICY=2)
1245
    def test_messages(self):
1246
        user = get_local_user('kpap@synnefo.org')
1247
        user.add_auth_provider('shibboleth', identifier='1234')
1248
        user.add_auth_provider('shibboleth', identifier='12345')
1249
        provider = auth_providers.get_provider('shibboleth')
1250
        self.assertEqual(provider.get_message('title'), 'Academic')
1251
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1252
        # regenerate messages cache
1253
        provider = auth_providers.get_provider('shibboleth')
1254
        self.assertEqual(provider.get_message('title'), 'New title')
1255
        self.assertEqual(provider.get_message('login_title'),
1256
                         'New title LOGIN')
1257
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1258
        self.assertEqual(provider.get_module_icon,
1259
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1260
        self.assertEqual(provider.get_module_medium_icon,
1261
                         settings.MEDIA_URL +
1262
                         'im/auth/icons-medium/shibboleth.png')
1263

    
1264
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1265
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1266
        self.assertEqual(provider.get_method_details_msg,
1267
                         'Account: 12345')
1268
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1269
        self.assertEqual(provider.get_method_details_msg,
1270
                         'Account: 1234')
1271

    
1272
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1273
        self.assertEqual(provider.get_not_active_msg,
1274
                         "'Academic login' is disabled.")
1275

    
1276
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1277
    @shibboleth_settings(LIMIT_POLICY=2)
1278
    def test_templates(self):
1279
        user = get_local_user('kpap@synnefo.org')
1280
        user.add_auth_provider('shibboleth', identifier='1234')
1281
        user.add_auth_provider('shibboleth', identifier='12345')
1282

    
1283
        provider = auth_providers.get_provider('shibboleth')
1284
        self.assertEqual(provider.get_template('login'),
1285
                         'im/auth/shibboleth_login.html')
1286
        provider = auth_providers.get_provider('google')
1287
        self.assertEqual(provider.get_template('login'),
1288
                         'im/auth/generic_login.html')
1289

    
1290

    
1291
class TestActivationBackend(TestCase):
1292

    
1293
    def setUp(self):
1294
        # dummy call to pass through logging middleware
1295
        self.client.get(ui_url(''))
1296

    
1297
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1298
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1299
    def test_policies(self):
1300
        backend = activation_backends.get_backend()
1301

    
1302
        # email matches RE_USER_EMAIL_PATTERNS
1303
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1304
                               is_active=False, email_verified=False)
1305
        backend.handle_verification(user1, user1.verification_code)
1306
        self.assertEqual(user1.accepted_policy, 'email')
1307

    
1308
        # manually moderated
1309
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1310
                               is_active=False, email_verified=False)
1311

    
1312
        backend.handle_verification(user2, user2.verification_code)
1313
        self.assertEqual(user2.moderated, False)
1314
        backend.handle_moderation(user2)
1315
        self.assertEqual(user2.moderated, True)
1316
        self.assertEqual(user2.accepted_policy, 'manual')
1317

    
1318
        # autoaccept due to provider automoderate policy
1319
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1320
                               is_active=False, email_verified=False)
1321
        user3.auth_providers.all().delete()
1322
        user3.add_auth_provider('shibboleth', identifier='shib123')
1323
        backend.handle_verification(user3, user3.verification_code)
1324
        self.assertEqual(user3.moderated, True)
1325
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1326

    
1327
    @im_settings(MODERATION_ENABLED=False,
1328
                 MANAGERS=(('Manager',
1329
                            'manager@synnefo.org'),),
1330
                 HELPDESK=(('Helpdesk',
1331
                            'helpdesk@synnefo.org'),),
1332
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1333
    def test_without_moderation(self):
1334
        backend = activation_backends.get_backend()
1335
        form = backend.get_signup_form('local')
1336
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1337

    
1338
        user_data = {
1339
            'email': 'kpap@synnefo.org',
1340
            'first_name': 'Kostas Papas',
1341
            'password1': '123',
1342
            'password2': '123'
1343
        }
1344
        form = backend.get_signup_form('local', user_data)
1345
        user = form.create_user()
1346
        self.assertEqual(user.is_active, False)
1347
        self.assertEqual(user.email_verified, False)
1348

    
1349
        # step one, registration
1350
        result = backend.handle_registration(user)
1351
        user = AstakosUser.objects.get()
1352
        self.assertEqual(user.is_active, False)
1353
        self.assertEqual(user.email_verified, False)
1354
        self.assertTrue(user.verification_code)
1355
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1356
        backend.send_result_notifications(result, user)
1357
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1358
        self.assertEqual(len(mail.outbox), 1)
1359

    
1360
        # step two, verify email (automatically
1361
        # moderates/accepts user, since moderation is disabled)
1362
        user = AstakosUser.objects.get()
1363
        valid_code = user.verification_code
1364

    
1365
        # test invalid code
1366
        result = backend.handle_verification(user, valid_code)
1367
        backend.send_result_notifications(result, user)
1368
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1369
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1370
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1371
        # verification + activated + greeting = 3
1372
        self.assertEqual(len(mail.outbox), 3)
1373
        user = AstakosUser.objects.get()
1374
        self.assertEqual(user.is_active, True)
1375
        self.assertEqual(user.moderated, True)
1376
        self.assertTrue(user.moderated_at)
1377
        self.assertEqual(user.email_verified, True)
1378
        self.assertTrue(user.activation_sent)
1379

    
1380
    @im_settings(MODERATION_ENABLED=True,
1381
                 MANAGERS=(('Manager',
1382
                            'manager@synnefo.org'),),
1383
                 HELPDESK=(('Helpdesk',
1384
                            'helpdesk@synnefo.org'),),
1385
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1386
    def test_with_moderation(self):
1387

    
1388
        backend = activation_backends.get_backend()
1389
        form = backend.get_signup_form('local')
1390
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1391

    
1392
        user_data = {
1393
            'email': 'kpap@synnefo.org',
1394
            'first_name': 'Kostas Papas',
1395
            'password1': '123',
1396
            'password2': '123'
1397
        }
1398
        form = backend.get_signup_form(provider='local',
1399
                                       initial_data=user_data)
1400
        user = form.create_user()
1401
        self.assertEqual(user.is_active, False)
1402
        self.assertEqual(user.email_verified, False)
1403

    
1404
        # step one, registration
1405
        result = backend.handle_registration(user)
1406
        user = AstakosUser.objects.get()
1407
        self.assertEqual(user.is_active, False)
1408
        self.assertEqual(user.email_verified, False)
1409
        self.assertTrue(user.verification_code)
1410
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1411
        backend.send_result_notifications(result, user)
1412
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1413
        self.assertEqual(len(mail.outbox), 1)
1414

    
1415
        # step two, verifying email
1416
        user = AstakosUser.objects.get()
1417
        valid_code = user.verification_code
1418
        invalid_code = user.verification_code + 'invalid'
1419

    
1420
        # test invalid code
1421
        result = backend.handle_verification(user, invalid_code)
1422
        self.assertEqual(result.status, backend.Result.ERROR)
1423
        backend.send_result_notifications(result, user)
1424
        user = AstakosUser.objects.get()
1425
        self.assertEqual(user.is_active, False)
1426
        self.assertEqual(user.moderated, False)
1427
        self.assertEqual(user.moderated_at, None)
1428
        self.assertEqual(user.email_verified, False)
1429
        self.assertTrue(user.activation_sent)
1430

    
1431
        # test valid code
1432
        user = AstakosUser.objects.get()
1433
        result = backend.handle_verification(user, valid_code)
1434
        backend.send_result_notifications(result, user)
1435
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1436
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1437
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1438
        self.assertEqual(len(mail.outbox), 2)
1439
        user = AstakosUser.objects.get()
1440
        self.assertEqual(user.moderated, False)
1441
        self.assertEqual(user.moderated_at, None)
1442
        self.assertEqual(user.email_verified, True)
1443
        self.assertTrue(user.activation_sent)
1444

    
1445
        # test code reuse
1446
        result = backend.handle_verification(user, valid_code)
1447
        self.assertEqual(result.status, backend.Result.ERROR)
1448
        user = AstakosUser.objects.get()
1449
        self.assertEqual(user.is_active, False)
1450
        self.assertEqual(user.moderated, False)
1451
        self.assertEqual(user.moderated_at, None)
1452
        self.assertEqual(user.email_verified, True)
1453
        self.assertTrue(user.activation_sent)
1454

    
1455
        # valid code on verified user
1456
        user = AstakosUser.objects.get()
1457
        valid_code = user.verification_code
1458
        result = backend.handle_verification(user, valid_code)
1459
        self.assertEqual(result.status, backend.Result.ERROR)
1460

    
1461
        # step three, moderation user
1462
        user = AstakosUser.objects.get()
1463
        result = backend.handle_moderation(user)
1464
        backend.send_result_notifications(result, user)
1465

    
1466
        user = AstakosUser.objects.get()
1467
        self.assertEqual(user.is_active, True)
1468
        self.assertEqual(user.moderated, True)
1469
        self.assertTrue(user.moderated_at)
1470
        self.assertEqual(user.email_verified, True)
1471
        self.assertTrue(user.activation_sent)
1472

    
1473

    
1474
class TestWebloginRedirect(TestCase):
1475

    
1476
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1477
    def test_restricts_domains(self):
1478
        get_local_user('user1@synnefo.org')
1479

    
1480
        # next url construct helpers
1481
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1482
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1483
            urllib.quote_plus(nxt)
1484

    
1485
        # common cases
1486
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1487
        invalid_scheme = weblogin("customscheme://localhost")
1488
        invalid_scheme_with_valid_domain = \
1489
            weblogin("http://www.invaliddomain.com")
1490
        valid_scheme = weblogin("pithos://localhost/")
1491
        # to be used in assertRedirects
1492
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1493

    
1494
        # not authenticated, redirects to login which contains next param with
1495
        # additional nested quoted next params
1496
        r = self.client.get(valid_scheme, follow=True)
1497
        login_redirect = reverse('login') + '?next=' + \
1498
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1499
        self.assertRedirects(r, login_redirect)
1500

    
1501
        # authenticate client
1502
        self.client.login(username="user1@synnefo.org", password="password")
1503

    
1504
        # valid scheme
1505
        r = self.client.get(valid_scheme, follow=True)
1506
        url = r.redirect_chain[1][0]
1507
        # scheme preserved
1508
        self.assertTrue(url.startswith('pithos://localhost/'))
1509
        # redirect contains token param
1510
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1511
                                   scheme='https').query
1512
        params = urlparse.parse_qs(params)
1513
        self.assertEqual(params['token'][0],
1514
                         AstakosUser.objects.get().auth_token)
1515
        # does not contain uuid
1516
        # reverted for 0.14.2 to support old pithos desktop clients
1517
        #self.assertFalse('uuid' in params)
1518

    
1519
        # invalid cases
1520
        r = self.client.get(invalid_scheme, follow=True)
1521
        self.assertEqual(r.status_code, 403)
1522

    
1523
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1524
        self.assertEqual(r.status_code, 403)
1525

    
1526
        r = self.client.get(invalid_domain, follow=True)
1527
        self.assertEqual(r.status_code, 403)