Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ c1f65a1e

History | View | Annotate | Download (61.2 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True)
81
        token = PendingThirdPartyUser.objects.get().token
82
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
83
        self.assertEqual(r.status_code, 200)
84

    
85
        # a new pending user created
86
        pending_user = PendingThirdPartyUser.objects.get(
87
            third_party_identifier="kpapeppn")
88
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
89
        # keep the token for future use
90
        token = pending_user.token
91
        # from now on no shibboleth headers are sent to the server
92
        client.reset_tokens()
93

    
94
        # this is the old way, it should fail, to avoid pending user take over
95
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
96
        self.assertEqual(r.status_code, 404)
97

    
98
        # this is the signup unique url associated with the pending user
99
        # created
100
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
101
        identifier = pending_user.third_party_identifier
102
        post_data = {'third_party_identifier': identifier,
103
                     'first_name': 'Kostas',
104
                     'third_party_token': token,
105
                     'last_name': 'Mitroglou',
106
                     'provider': 'shibboleth'}
107

    
108
        signup_url = reverse('signup')
109

    
110
        # invlid email
111
        post_data['email'] = 'kpap'
112
        r = client.post(signup_url, post_data)
113
        self.assertContains(r, token)
114

    
115
        # existing email
116
        existing_user = get_local_user('test@test.com')
117
        post_data['email'] = 'test@test.com'
118
        r = client.post(signup_url, post_data)
119
        self.assertContains(r, messages.EMAIL_USED)
120
        existing_user.delete()
121

    
122
        # and finally a valid signup
123
        post_data['email'] = 'kpap@synnefo.org'
124
        r = client.post(signup_url, post_data, follow=True)
125
        self.assertContains(r, messages.VERIFICATION_SENT)
126

    
127
        # entires commited as expected
128
        self.assertEqual(AstakosUser.objects.count(), 1)
129
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
130
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
131

    
132
        # provider info stored
133
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
134
        self.assertEqual(provider.affiliation, 'Test Affiliation')
135
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
136
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
137
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
138
        self.assertTrue('headers' in provider.info)
139

    
140
        # login (not activated yet)
141
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
142
                          cn="Kostas Papadimitriou")
143
        r = client.get(ui_url("login/shibboleth?"), follow=True)
144
        self.assertContains(r, 'is pending moderation')
145

    
146
        # admin activates the user
147
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
148
        backend = activation_backends.get_backend()
149
        activation_result = backend.verify_user(u, u.verification_code)
150
        activation_result = backend.accept_user(u)
151
        self.assertFalse(activation_result.is_error())
152
        backend.send_result_notifications(activation_result, u)
153
        self.assertEqual(u.is_active, True)
154

    
155
        # we see our profile
156
        r = client.get(ui_url("login/shibboleth?"), follow=True)
157
        self.assertRedirects(r, ui_url('landing'))
158
        self.assertEqual(r.status_code, 200)
159

    
160
    def test_existing(self):
161
        """
162
        Test adding of third party login to an existing account
163
        """
164

    
165
        # this is our existing user
166
        existing_user = get_local_user('kpap@synnefo.org')
167
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
168
        existing_inactive.is_active = False
169
        existing_inactive.save()
170

    
171
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
172
        existing_unverified.is_active = False
173
        existing_unverified.activation_sent = None
174
        existing_unverified.email_verified = False
175
        existing_unverified.is_verified = False
176
        existing_unverified.save()
177

    
178
        client = ShibbolethClient()
179
        # shibboleth logged us in, notice that we use different email
180
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
181
                          cn="Kostas Papadimitriou", )
182
        r = client.get(ui_url("login/shibboleth?"), follow=True)
183

    
184
        # a new pending user created
185
        pending_user = PendingThirdPartyUser.objects.get()
186
        token = pending_user.token
187
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
188
        pending_key = pending_user.token
189
        client.reset_tokens()
190
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
191

    
192
        form = r.context['login_form']
193
        signupdata = copy.copy(form.initial)
194
        signupdata['email'] = 'kpap@synnefo.org'
195
        signupdata['third_party_token'] = token
196
        signupdata['provider'] = 'shibboleth'
197
        signupdata.pop('id', None)
198

    
199
        # the email exists to another user
200
        r = client.post(ui_url("signup"), signupdata)
201
        self.assertContains(r, "There is already an account with this email "
202
                               "address")
203
        # change the case, still cannot create
204
        signupdata['email'] = 'KPAP@synnefo.org'
205
        r = client.post(ui_url("signup"), signupdata)
206
        self.assertContains(r, "There is already an account with this email "
207
                               "address")
208
        # inactive user
209
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
210
        r = client.post(ui_url("signup"), signupdata)
211
        self.assertContains(r, "There is already an account with this email "
212
                               "address")
213

    
214
        # unverified user, this should pass, old entry will be deleted
215
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
216
        r = client.post(ui_url("signup"), signupdata)
217

    
218
        post_data = {'password': 'password',
219
                     'username': 'kpap@synnefo.org'}
220
        r = client.post(ui_url('local'), post_data, follow=True)
221
        self.assertTrue(r.context['request'].user.is_authenticated())
222
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
223
                          cn="Kostas Papadimitriou", )
224
        r = client.get(ui_url("login/shibboleth?"), follow=True)
225
        self.assertContains(r, "enabled for this account")
226
        client.reset_tokens()
227

    
228
        user = existing_user
229
        self.assertTrue(user.has_auth_provider('shibboleth'))
230
        self.assertTrue(user.has_auth_provider('local',
231
                                               auth_backend='astakos'))
232
        client.logout()
233

    
234
        # look Ma, i can login with both my shibboleth and local account
235
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
236
                          cn="Kostas Papadimitriou")
237
        r = client.get(ui_url("login/shibboleth?"), follow=True)
238
        self.assertTrue(r.context['request'].user.is_authenticated())
239
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
240
        self.assertRedirects(r, ui_url('landing'))
241
        self.assertEqual(r.status_code, 200)
242
        client.logout()
243
        client.reset_tokens()
244

    
245
        # logged out
246
        r = client.get(ui_url("profile"), follow=True)
247
        self.assertFalse(r.context['request'].user.is_authenticated())
248

    
249
        # login with local account also works
250
        post_data = {'password': 'password',
251
                     'username': 'kpap@synnefo.org'}
252
        r = self.client.post(ui_url('local'), post_data, follow=True)
253
        self.assertTrue(r.context['request'].user.is_authenticated())
254
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
255
        self.assertRedirects(r, ui_url('landing'))
256
        self.assertEqual(r.status_code, 200)
257

    
258
        # cannot add the same eppn
259
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
260
                          cn="Kostas Papadimitriou", )
261
        r = client.get(ui_url("login/shibboleth?"), follow=True)
262
        self.assertRedirects(r, ui_url('landing'))
263
        self.assertTrue(r.status_code, 200)
264
        self.assertEquals(existing_user.auth_providers.count(), 2)
265

    
266
        # only one allowed by default
267
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
268
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
269
        prov = auth_providers.get_provider('shibboleth')
270
        r = client.get(ui_url("login/shibboleth?"), follow=True)
271
        self.assertContains(r, "Failed to add")
272
        self.assertRedirects(r, ui_url('profile'))
273
        self.assertTrue(r.status_code, 200)
274
        self.assertEquals(existing_user.auth_providers.count(), 2)
275
        client.logout()
276
        client.reset_tokens()
277

    
278
        # cannot login with another eppn
279
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
280
                          cn="Kostas Papadimitriou")
281
        r = client.get(ui_url("login/shibboleth?"), follow=True)
282
        self.assertFalse(r.context['request'].user.is_authenticated())
283

    
284
        # cannot
285

    
286
        # lets remove local password
287
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
288
                                       email="kpap@synnefo.org")
289
        remove_local_url = user.get_auth_provider('local').get_remove_url
290
        remove_shibbo_url = user.get_auth_provider('shibboleth',
291
                                                   'kpapeppn').get_remove_url
292
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
293
                          cn="Kostas Papadimtriou")
294
        r = client.get(ui_url("login/shibboleth?"), follow=True)
295
        client.reset_tokens()
296

    
297
        # only POST is allowed (for CSRF protection)
298
        r = client.get(remove_local_url, follow=True)
299
        self.assertEqual(r.status_code, 405)
300

    
301
        r = client.post(remove_local_url, follow=True)
302
        # 2 providers left
303
        self.assertEqual(user.auth_providers.count(), 1)
304
        # cannot remove last provider
305
        r = client.post(remove_shibbo_url)
306
        self.assertEqual(r.status_code, 403)
307
        self.client.logout()
308

    
309
        # cannot login using local credentials (notice we use another client)
310
        post_data = {'password': 'password',
311
                     'username': 'kpap@synnefo.org'}
312
        r = self.client.post(ui_url('local'), post_data, follow=True)
313
        self.assertFalse(r.context['request'].user.is_authenticated())
314

    
315
        # we can reenable the local provider by setting a password
316
        r = client.get(ui_url("password_change"), follow=True)
317
        r = client.post(ui_url("password_change"), {'new_password1': '111',
318
                                                    'new_password2': '111'},
319
                        follow=True)
320
        user = r.context['request'].user
321
        self.assertTrue(user.has_auth_provider('local'))
322
        self.assertTrue(user.has_auth_provider('shibboleth'))
323
        self.assertTrue(user.check_password('111'))
324
        self.assertTrue(user.has_usable_password())
325

    
326
        # change password via profile form
327
        r = client.post(ui_url("profile"), {
328
            'old_password': '111',
329
            'new_password': '',
330
            'new_password2': '',
331
            'change_password': 'on',
332
        }, follow=False)
333
        self.assertEqual(r.status_code, 200)
334
        self.assertFalse(r.context['profile_form'].is_valid())
335

    
336
        self.client.logout()
337

    
338
        # now we can login
339
        post_data = {'password': '111',
340
                     'username': 'kpap@synnefo.org'}
341
        r = self.client.post(ui_url('local'), post_data, follow=True)
342
        self.assertTrue(r.context['request'].user.is_authenticated())
343

    
344
        client.reset_tokens()
345

    
346
        # we cannot take over another shibboleth identifier
347
        user2 = get_local_user('another@synnefo.org')
348
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
349
        # login
350
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
351
                          cn="Kostas Papadimitriou")
352
        r = client.get(ui_url("login/shibboleth?"), follow=True)
353
        # try to assign existing shibboleth identifier of another user
354
        client.set_tokens(mail="kpap_second@shibboleth.gr",
355
                          eppn="existingeppn", cn="Kostas Papadimitriou")
356
        r = client.get(ui_url("login/shibboleth?"), follow=True)
357
        self.assertContains(r, "is already in use")
358

    
359

    
360
class TestLocal(TestCase):
361

    
362
    def setUp(self):
363
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
364
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
365
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
366
        settings.ASTAKOS_MODERATION_ENABLED = True
367

    
368
    def tearDown(self):
369
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
370
        AstakosUser.objects.all().delete()
371

    
372
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
373
    def test_login_ratelimit(self):
374
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
375
        r = self.client.post(ui_url('local'), credentials, follow=True)
376
        fields = r.context['login_form'].fields.keyOrder
377
        self.assertFalse('recaptcha_challenge_field' in fields)
378
        r = self.client.post(ui_url('local'), credentials, follow=True)
379
        fields = r.context['login_form'].fields.keyOrder
380
        self.assertFalse('recaptcha_challenge_field' in fields)
381
        r = self.client.post(ui_url('local'), credentials, follow=True)
382
        fields = r.context['login_form'].fields.keyOrder
383
        self.assertTrue('recaptcha_challenge_field' in fields)
384

    
385
    def test_no_moderation(self):
386
        # disable moderation
387
        astakos_settings.MODERATION_ENABLED = False
388

    
389
        # create a new user
390
        r = self.client.get(ui_url("signup"))
391
        self.assertEqual(r.status_code, 200)
392
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
393
                'password2': 'password', 'first_name': 'Kostas',
394
                'last_name': 'Mitroglou', 'provider': 'local'}
395
        r = self.client.post(ui_url("signup"), data)
396

    
397
        # user created
398
        self.assertEqual(AstakosUser.objects.count(), 1)
399
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
400
                                       email="kpap@synnefo.org")
401
        self.assertEqual(user.username, 'kpap@synnefo.org')
402
        self.assertEqual(user.has_auth_provider('local'), True)
403
        self.assertFalse(user.is_active)
404

    
405
        # user (but not admin) gets notified
406
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
407
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
408
        astakos_settings.MODERATION_ENABLED = True
409

    
410
    def test_email_case(self):
411
        data = {
412
            'email': 'kPap@synnefo.org',
413
            'password1': '1234',
414
            'password2': '1234'
415
        }
416

    
417
        form = forms.LocalUserCreationForm(data)
418
        self.assertTrue(form.is_valid())
419
        user = form.save()
420
        form.store_user(user, {})
421

    
422
        u = AstakosUser.objects.get()
423
        self.assertEqual(u.email, 'kPap@synnefo.org')
424
        self.assertEqual(u.username, 'kpap@synnefo.org')
425
        u.is_active = True
426
        u.email_verified = True
427
        u.save()
428

    
429
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
430
        login = forms.LoginForm(data=data)
431
        self.assertTrue(login.is_valid())
432

    
433
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
434
        login = forms.LoginForm(data=data)
435
        self.assertTrue(login.is_valid())
436

    
437
        data = {
438
            'email': 'kpap@synnefo.org',
439
            'password1': '1234',
440
            'password2': '1234'
441
        }
442
        form = forms.LocalUserCreationForm(data)
443
        self.assertFalse(form.is_valid())
444

    
445
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
446
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
447
    def test_local_provider(self):
448
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
449

    
450
        # create a user
451
        r = self.client.get(ui_url("signup"))
452
        self.assertEqual(r.status_code, 200)
453
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
454
                'password2': 'password', 'first_name': 'Kostas',
455
                'last_name': 'Mitroglou', 'provider': 'local'}
456
        r = self.client.post(ui_url("signup"), data)
457

    
458
        # user created
459
        self.assertEqual(AstakosUser.objects.count(), 1)
460
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
461
                                       email="kpap@synnefo.org")
462
        self.assertEqual(user.username, 'kpap@synnefo.org')
463
        self.assertEqual(user.has_auth_provider('local'), True)
464
        self.assertFalse(user.is_active)  # not activated
465
        self.assertFalse(user.email_verified)  # not verified
466
        self.assertTrue(user.activation_sent)  # activation automatically sent
467
        self.assertFalse(user.moderated)
468
        self.assertFalse(user.email_verified)
469

    
470
        # admin gets notified and activates the user from the command line
471
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
472
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
473
                                               'password': 'password'},
474
                             follow=True)
475
        self.assertContains(r, messages.VERIFICATION_SENT)
476
        backend = activation_backends.get_backend()
477

    
478
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
479
        backend.send_user_verification_email(user)
480

    
481
        # user activation fields updated and user gets notified via email
482
        user = AstakosUser.objects.get(pk=user.pk)
483
        self.assertTrue(user.activation_sent)
484
        self.assertFalse(user.email_verified)
485
        self.assertFalse(user.is_active)
486
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
487

    
488
        # user forgot she got registered and tries to submit registration
489
        # form. Notice the upper case in email
490
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
491
                'password2': 'password', 'first_name': 'Kostas',
492
                'last_name': 'Mitroglou', 'provider': 'local'}
493
        r = self.client.post(ui_url("signup"), data, follow=True)
494
        self.assertRedirects(r, reverse('login'))
495
        self.assertContains(r, messages.VERIFICATION_SENT)
496

    
497
        user = AstakosUser.objects.get()
498
        # previous user replaced
499
        self.assertTrue(user.activation_sent)
500
        self.assertFalse(user.email_verified)
501
        self.assertFalse(user.is_active)
502
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
503

    
504
        # hmmm, email exists; lets request a password change
505
        r = self.client.get(ui_url('local/password_reset'))
506
        self.assertEqual(r.status_code, 200)
507
        data = {'email': 'kpap@synnefo.org'}
508
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
509
        # she can't because account is not active yet
510
        self.assertContains(r, 'pending activation')
511

    
512
        # moderation is enabled and an activation email has already been sent
513
        # so user can trigger resend of the activation email
514
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
515
                            follow=True)
516
        self.assertContains(r, 'has been sent to your email address.')
517
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
518

    
519
        # also she cannot login
520
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
521
        r = self.client.post(ui_url('local'), data, follow=True)
522
        self.assertContains(r, 'Resend activation')
523
        self.assertFalse(r.context['request'].user.is_authenticated())
524
        self.assertFalse('_pithos2_a' in self.client.cookies)
525

    
526
        # user sees the message and resends activation
527
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
528
                            follow=True)
529
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
530

    
531
        # logged in user cannot activate another account
532
        tmp_user = get_local_user("test_existing_user@synnefo.org")
533
        tmp_client = Client()
534
        tmp_client.login(username="test_existing_user@synnefo.org",
535
                         password="password")
536
        r = tmp_client.get(user.get_activation_url(), follow=True)
537
        self.assertContains(r, messages.LOGGED_IN_WARNING)
538

    
539
        r = self.client.get(user.get_activation_url(), follow=True)
540
        # previous code got invalidated
541
        self.assertEqual(r.status_code, 404)
542

    
543
        user = AstakosUser.objects.get(pk=user.pk)
544
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
545
        r = self.client.get(user.get_activation_url(), follow=True)
546
        self.assertRedirects(r, reverse('login'))
547
        # user sees that account is pending approval from admins
548
        self.assertContains(r, messages.NOTIFICATION_SENT)
549
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
550

    
551
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
552
        result = backend.handle_moderation(user)
553
        backend.send_result_notifications(result, user)
554
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
555
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
556

    
557
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
558
        r = self.client.get(ui_url('profile'), follow=True)
559
        self.assertFalse(r.context['request'].user.is_authenticated())
560
        self.assertFalse('_pithos2_a' in self.client.cookies)
561
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
562

    
563
        user = AstakosUser.objects.get(pk=user.pk)
564
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
565
                                               'password': 'password'},
566
                             follow=True)
567
        # user activated and logged in, token cookie set
568
        self.assertTrue(r.context['request'].user.is_authenticated())
569
        self.assertTrue('_pithos2_a' in self.client.cookies)
570
        cookies = self.client.cookies
571
        self.assertTrue(quote(user.auth_token) in
572
                        cookies.get('_pithos2_a').value)
573
        r = self.client.get(ui_url('logout'), follow=True)
574
        r = self.client.get(ui_url(''), follow=True)
575
        self.assertRedirects(r, ui_url('login'))
576
        # user logged out, token cookie removed
577
        self.assertFalse(r.context['request'].user.is_authenticated())
578
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
579

    
580
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
581
        del self.client.cookies['_pithos2_a']
582

    
583
        # user can login
584
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
585
                                               'password': 'password'},
586
                             follow=True)
587
        self.assertTrue(r.context['request'].user.is_authenticated())
588
        self.assertTrue('_pithos2_a' in self.client.cookies)
589
        cookies = self.client.cookies
590
        self.assertTrue(quote(user.auth_token) in
591
                        cookies.get('_pithos2_a').value)
592
        self.client.get(ui_url('logout'), follow=True)
593

    
594
        # user forgot password
595
        old_pass = user.password
596
        r = self.client.get(ui_url('local/password_reset'))
597
        self.assertEqual(r.status_code, 200)
598
        r = self.client.post(ui_url('local/password_reset'),
599
                             {'email': 'kpap@synnefo.org'})
600
        self.assertEqual(r.status_code, 302)
601
        # email sent
602
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
603

    
604
        # user visits change password link
605
        user = AstakosUser.objects.get(pk=user.pk)
606
        r = self.client.get(user.get_password_reset_url())
607
        r = self.client.post(user.get_password_reset_url(),
608
                             {'new_password1': 'newpass',
609
                              'new_password2': 'newpass'})
610

    
611
        user = AstakosUser.objects.get(pk=user.pk)
612
        self.assertNotEqual(old_pass, user.password)
613

    
614
        # old pass is not usable
615
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
616
                                               'password': 'password'})
617
        self.assertContains(r, 'Please enter a correct username and password')
618
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
619
                                               'password': 'newpass'},
620
                             follow=True)
621
        self.assertTrue(r.context['request'].user.is_authenticated())
622
        self.client.logout()
623

    
624
        # tests of special local backends
625
        user = AstakosUser.objects.get(pk=user.pk)
626
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
627
        user.save()
628

    
629
        # non astakos local backends do not support password reset
630
        r = self.client.get(ui_url('local/password_reset'))
631
        self.assertEqual(r.status_code, 200)
632
        r = self.client.post(ui_url('local/password_reset'),
633
                             {'email': 'kpap@synnefo.org'})
634
        # she can't because account is not active yet
635
        self.assertContains(r, "Changing password is not")
636

    
637

    
638
class UserActionsTests(TestCase):
639

    
640
    def test_email_change(self):
641
        # to test existing email validation
642
        get_local_user('existing@synnefo.org')
643

    
644
        # local user
645
        user = get_local_user('kpap@synnefo.org')
646

    
647
        # login as kpap
648
        self.client.login(username='kpap@synnefo.org', password='password')
649
        r = self.client.get(ui_url('profile'), follow=True)
650
        user = r.context['request'].user
651
        self.assertTrue(user.is_authenticated())
652

    
653
        # change email is enabled
654
        r = self.client.get(ui_url('email_change'))
655
        self.assertEqual(r.status_code, 200)
656
        self.assertFalse(user.email_change_is_pending())
657

    
658
        # request email change to an existing email fails
659
        data = {'new_email_address': 'existing@synnefo.org'}
660
        r = self.client.post(ui_url('email_change'), data)
661
        self.assertContains(r, messages.EMAIL_USED)
662

    
663
        # proper email change
664
        data = {'new_email_address': 'kpap@gmail.com'}
665
        r = self.client.post(ui_url('email_change'), data, follow=True)
666
        self.assertRedirects(r, ui_url('profile'))
667
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
668
        change1 = EmailChange.objects.get()
669

    
670
        # user sees a warning
671
        r = self.client.get(ui_url('email_change'))
672
        self.assertEqual(r.status_code, 200)
673
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
674
        self.assertTrue(user.email_change_is_pending())
675

    
676
        # link was sent
677
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
678
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
679

    
680
        # proper email change
681
        data = {'new_email_address': 'kpap@yahoo.com'}
682
        r = self.client.post(ui_url('email_change'), data, follow=True)
683
        self.assertRedirects(r, ui_url('profile'))
684
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
685
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
686
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
687
        change2 = EmailChange.objects.get()
688

    
689
        r = self.client.get(change1.get_url())
690
        self.assertEquals(r.status_code, 404)
691
        self.client.logout()
692

    
693
        invalid_client = Client()
694
        r = invalid_client.post(ui_url('local?'),
695
                                {'username': 'existing@synnefo.org',
696
                                 'password': 'password'})
697
        r = invalid_client.get(change2.get_url(), follow=True)
698
        self.assertEquals(r.status_code, 403)
699

    
700
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
701
                             {'username': 'kpap@synnefo.org',
702
                              'password': 'password',
703
                              'next': change2.get_url()},
704
                             follow=True)
705
        self.assertRedirects(r, ui_url('profile'))
706
        user = r.context['request'].user
707
        self.assertEquals(user.email, 'kpap@yahoo.com')
708
        self.assertEquals(user.username, 'kpap@yahoo.com')
709

    
710
        self.client.logout()
711
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
712
                             {'username': 'kpap@synnefo.org',
713
                              'password': 'password',
714
                              'next': change2.get_url()},
715
                             follow=True)
716
        self.assertContains(r, "Please enter a correct username and password")
717
        self.assertEqual(user.emailchanges.count(), 0)
718

    
719
        AstakosUser.objects.all().delete()
720
        Group.objects.all().delete()
721

    
722

    
723
class TestAuthProviderViews(TestCase):
724

    
725
    def tearDown(self):
726
        AstakosUser.objects.all().delete()
727

    
728
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
729
                         AUTOMODERATE_POLICY=True)
730
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
731
                 FORCE_PROFILE_UPDATE=False)
732
    def test_user(self):
733
        Profile = AuthProviderPolicyProfile
734
        Pending = PendingThirdPartyUser
735
        User = AstakosUser
736

    
737
        User.objects.create(email="newuser@synnefo.org")
738
        get_local_user("olduser@synnefo.org")
739
        cl_olduser = ShibbolethClient()
740
        get_local_user("olduser2@synnefo.org")
741
        ShibbolethClient()
742
        cl_newuser = ShibbolethClient()
743
        cl_newuser2 = Client()
744

    
745
        academic_group, created = Group.objects.get_or_create(
746
            name='academic-login')
747
        academic_users = academic_group.user_set
748
        assert created
749
        policy_only_academic = Profile.objects.add_policy('academic_strict',
750
                                                          'shibboleth',
751
                                                          academic_group,
752
                                                          exclusive=True,
753
                                                          login=False,
754
                                                          add=False)
755

    
756
        # new academic user
757
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
758
        cl_newuser.set_tokens(eppn="newusereppn", mail="newuser@synnefo.org",
759
                              surname="Lastname")
760
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
761
        initial = r.context['signup_form'].initial
762
        pending = Pending.objects.get()
763
        self.assertEqual(initial.get('last_name'), 'Lastname')
764
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
765
        identifier = pending.third_party_identifier
766
        signup_data = {'third_party_identifier': identifier,
767
                       'first_name': 'Academic',
768
                       'third_party_token': pending.token,
769
                       'last_name': 'New User',
770
                       'provider': 'shibboleth'}
771
        r = cl_newuser.post(ui_url('signup'), signup_data)
772
        self.assertContains(r, "This field is required", )
773
        signup_data['email'] = 'olduser@synnefo.org'
774
        r = cl_newuser.post(ui_url('signup'), signup_data)
775
        self.assertContains(r, "already an account with this email", )
776
        signup_data['email'] = 'newuser@synnefo.org'
777
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
778
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
779
        self.assertEqual(r.status_code, 404)
780
        newuser = User.objects.get(email="newuser@synnefo.org")
781
        activation_link = newuser.get_activation_url()
782
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
783

    
784
        # new non-academic user
785
        signup_data = {'first_name': 'Non Academic',
786
                       'last_name': 'New User',
787
                       'provider': 'local',
788
                       'password1': 'password',
789
                       'password2': 'password'}
790
        signup_data['email'] = 'olduser@synnefo.org'
791
        r = cl_newuser2.post(ui_url('signup'), signup_data)
792
        self.assertContains(r, 'There is already an account with this '
793
                               'email address')
794
        signup_data['email'] = 'newuser@synnefo.org'
795
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
796
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
797
        r = self.client.get(activation_link, follow=True)
798
        self.assertEqual(r.status_code, 404)
799
        newuser = User.objects.get(email="newuser@synnefo.org")
800
        self.assertTrue(newuser.activation_sent)
801

    
802
        # activation sent, user didn't open verification url so additional
803
        # registrations invalidate the previous signups.
804
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
805
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
806
        pending = Pending.objects.get()
807
        identifier = pending.third_party_identifier
808
        signup_data = {'third_party_identifier': identifier,
809
                       'first_name': 'Academic',
810
                       'third_party_token': pending.token,
811
                       'last_name': 'New User',
812
                       'provider': 'shibboleth'}
813
        signup_data['email'] = 'newuser@synnefo.org'
814
        r = cl_newuser.post(ui_url('signup'), signup_data)
815
        self.assertEqual(r.status_code, 302)
816
        newuser = User.objects.get(email="newuser@synnefo.org")
817
        self.assertTrue(newuser.activation_sent)
818
        activation_link = newuser.get_activation_url()
819
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
820
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
821
        self.assertRedirects(r, ui_url('landing'))
822
        newuser = User.objects.get(email="newuser@synnefo.org")
823
        self.assertEqual(newuser.is_active, True)
824
        self.assertEqual(newuser.email_verified, True)
825
        cl_newuser.logout()
826

    
827
        # cannot reactivate if suspended
828
        newuser.is_active = False
829
        newuser.save()
830
        r = cl_newuser.get(newuser.get_activation_url())
831
        newuser = User.objects.get(email="newuser@synnefo.org")
832
        self.assertFalse(newuser.is_active)
833

    
834
        # release suspension
835
        newuser.is_active = True
836
        newuser.save()
837

    
838
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
839
        local = auth.get_provider('local', newuser)
840
        self.assertEqual(local.get_add_policy, False)
841
        self.assertEqual(local.get_login_policy, False)
842
        r = cl_newuser.get(local.get_add_url, follow=True)
843
        self.assertRedirects(r, ui_url('profile'))
844
        self.assertContains(r, 'disabled for your')
845

    
846
        cl_olduser.login(username='olduser@synnefo.org', password="password")
847
        r = cl_olduser.get(ui_url('profile'), follow=True)
848
        self.assertEqual(r.status_code, 200)
849
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
850
        self.assertContains(r, 'Your request is missing a unique token')
851
        cl_olduser.set_tokens(eppn="newusereppn")
852
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
853
        self.assertContains(r, 'already in use')
854
        cl_olduser.set_tokens(eppn="oldusereppn")
855
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
856
        self.assertContains(r, 'Academic login enabled for this account')
857

    
858
        user = User.objects.get(email="olduser@synnefo.org")
859
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
860
        local_provider = user.get_auth_provider('local')
861
        self.assertEqual(shib_provider.get_remove_policy, True)
862
        self.assertEqual(local_provider.get_remove_policy, True)
863

    
864
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
865
                                                          'shibboleth',
866
                                                          academic_group,
867
                                                          remove=False)
868
        user.groups.add(academic_group)
869
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
870
        local_provider = user.get_auth_provider('local')
871
        self.assertEqual(shib_provider.get_remove_policy, False)
872
        self.assertEqual(local_provider.get_remove_policy, True)
873
        self.assertEqual(local_provider.get_login_policy, False)
874

    
875
        cl_olduser.logout()
876
        login_data = {'username': 'olduser@synnefo.org',
877
                      'password': 'password'}
878
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
879
        self.assertContains(r, "login/shibboleth'>Academic login")
880
        Group.objects.all().delete()
881

    
882

    
883
class TestAuthProvidersAPI(TestCase):
884
    """
885
    Test auth_providers module API
886
    """
887

    
888
    def tearDown(self):
889
        Group.objects.all().delete()
890

    
891
    @im_settings(IM_MODULES=['local', 'shibboleth'])
892
    def test_create(self):
893
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
894
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
895

    
896
        module = 'shibboleth'
897
        identifier = 'SHIB_UUID'
898
        provider_params = {
899
            'affiliation': 'UNIVERSITY',
900
            'info': {'age': 27}
901
        }
902
        provider = auth.get_provider(module, user2, identifier,
903
                                     **provider_params)
904
        provider.add_to_user()
905
        provider = auth.get_provider(module, user, identifier,
906
                                     **provider_params)
907
        provider.add_to_user()
908
        user.email_verified = True
909
        user.save()
910
        self.assertRaises(Exception, provider.add_to_user)
911
        provider = user.get_auth_provider(module, identifier)
912
        self.assertEqual(user.get_auth_provider(
913
            module, identifier)._instance.info.get('age'), 27)
914

    
915
        module = 'local'
916
        identifier = None
917
        provider_params = {'auth_backend': 'ldap', 'info':
918
                          {'office': 'A1'}}
919
        provider = auth.get_provider(module, user, identifier,
920
                                     **provider_params)
921
        provider.add_to_user()
922
        self.assertFalse(provider.get_add_policy)
923
        self.assertRaises(Exception, provider.add_to_user)
924

    
925
        shib = user.get_auth_provider('shibboleth',
926
                                      'SHIB_UUID')
927
        self.assertTrue(shib.get_remove_policy)
928

    
929
        local = user.get_auth_provider('local')
930
        self.assertTrue(local.get_remove_policy)
931

    
932
        local.remove_from_user()
933
        self.assertFalse(shib.get_remove_policy)
934
        self.assertRaises(Exception, shib.remove_from_user)
935

    
936
        provider = user.get_auth_providers()[0]
937
        self.assertRaises(Exception, provider.add_to_user)
938

    
939
    @im_settings(IM_MODULES=['local', 'shibboleth'])
940
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
941
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
942
                                                 'group2'])
943
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
944
                        CREATION_GROUPS_POLICY=['localgroup-create',
945
                                                'group-create'])
946
    def test_add_groups(self):
947
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
948
        provider = auth.get_provider('shibboleth', user, 'test123')
949
        provider.add_to_user()
950
        user = AstakosUser.objects.get()
951
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
952
                         sorted([u'group1', u'group2', u'group-create']))
953

    
954
        local = auth.get_provider('local', user)
955
        local.add_to_user()
956
        provider = user.get_auth_provider('shibboleth')
957
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
958
        provider.remove_from_user()
959
        user = AstakosUser.objects.get()
960
        self.assertEqual(len(user.get_auth_providers()), 1)
961
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
962
                         sorted([u'group-create', u'localgroup']))
963

    
964
        local = user.get_auth_provider('local')
965
        self.assertRaises(Exception, local.remove_from_user)
966
        provider = auth.get_provider('shibboleth', user, 'test123')
967
        provider.add_to_user()
968
        user = AstakosUser.objects.get()
969
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
970
                         sorted([u'group-create', u'group1', u'group2',
971
                                 u'localgroup']))
972
        Group.objects.all().delete()
973

    
974
    @im_settings(IM_MODULES=['local', 'shibboleth'])
975
    def test_policies(self):
976
        group_old, created = Group.objects.get_or_create(name='olduser')
977

    
978
        astakos_settings.MODERATION_ENABLED = True
979
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
980
            ['academic-user']
981
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
982
            ['google-user']
983

    
984
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
985
        user.groups.add(group_old)
986
        user.add_auth_provider('local')
987

    
988
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
989
        user2.add_auth_provider('shibboleth', identifier='shibid')
990

    
991
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
992
        user3.groups.add(group_old)
993
        user3.add_auth_provider('local')
994
        user3.add_auth_provider('shibboleth', identifier='1234')
995

    
996
        self.assertTrue(user2.groups.get(name='academic-user'))
997
        self.assertFalse(user2.groups.filter(name='olduser').count())
998

    
999
        local = auth_providers.get_provider('local')
1000
        self.assertTrue(local.get_add_policy)
1001

    
1002
        academic_group = Group.objects.get(name='academic-user')
1003
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1004
                                                     academic_group,
1005
                                                     exclusive=True,
1006
                                                     add=False,
1007
                                                     login=False)
1008
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1009
                                                     academic_group,
1010
                                                     exclusive=True,
1011
                                                     login=False,
1012
                                                     add=False)
1013
        # no duplicate entry gets created
1014
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1015

    
1016
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1017
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1018
                                                     user2,
1019
                                                     remove=False)
1020
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1021

    
1022
        local = auth_providers.get_provider('local', user2)
1023
        google = auth_providers.get_provider('google', user2)
1024
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1025
        self.assertTrue(shibboleth.get_login_policy)
1026
        self.assertFalse(shibboleth.get_remove_policy)
1027
        self.assertFalse(local.get_add_policy)
1028
        self.assertFalse(local.get_add_policy)
1029
        self.assertFalse(google.get_add_policy)
1030

    
1031
        user2.groups.remove(Group.objects.get(name='academic-user'))
1032
        self.assertTrue(local.get_add_policy)
1033
        self.assertTrue(google.get_add_policy)
1034
        user2.groups.add(Group.objects.get(name='academic-user'))
1035

    
1036
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1037
                                                     user2,
1038
                                                     exclusive=True,
1039
                                                     add=True)
1040
        self.assertTrue(local.get_add_policy)
1041
        self.assertTrue(google.get_add_policy)
1042

    
1043
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1044
        self.assertFalse(local.get_automoderate_policy)
1045
        self.assertFalse(google.get_automoderate_policy)
1046
        self.assertTrue(shibboleth.get_automoderate_policy)
1047

    
1048
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1049
                  'GOOGLE_ADD_GROUPS_POLICY']:
1050
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1051

    
1052
    @shibboleth_settings(CREATE_POLICY=True)
1053
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1054
    def test_create_http(self):
1055
        # this should be wrapped inside a transaction
1056
        user = AstakosUser(email="test@test.com")
1057
        user.save()
1058
        provider = auth_providers.get_provider('shibboleth', user,
1059
                                               'test@academia.test')
1060
        provider.add_to_user()
1061
        user.get_auth_provider('shibboleth', 'test@academia.test')
1062
        provider = auth_providers.get_provider('local', user)
1063
        provider.add_to_user()
1064
        user.get_auth_provider('local')
1065

    
1066
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1067
        user = AstakosUser(email="test2@test.com")
1068
        user.save()
1069
        provider = auth_providers.get_provider('shibboleth', user,
1070
                                               'test@shibboleth.com',
1071
                                               **{'info': {'name':
1072
                                                           'User Test'}})
1073
        self.assertFalse(provider.get_create_policy)
1074
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1075
        self.assertTrue(provider.get_create_policy)
1076
        academic = provider.add_to_user()
1077

    
1078
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1079
    @shibboleth_settings(LIMIT_POLICY=2)
1080
    def test_policies(self):
1081
        user = get_local_user('kpap@synnefo.org')
1082
        user.add_auth_provider('shibboleth', identifier='1234')
1083
        user.add_auth_provider('shibboleth', identifier='12345')
1084

    
1085
        # default limit is 1
1086
        local = user.get_auth_provider('local')
1087
        self.assertEqual(local.get_add_policy, False)
1088

    
1089
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1090
        academic = user.get_auth_provider('shibboleth',
1091
                                          identifier='1234')
1092
        self.assertEqual(academic.get_add_policy, False)
1093
        newacademic = auth_providers.get_provider('shibboleth', user,
1094
                                                  identifier='123456')
1095
        self.assertEqual(newacademic.get_add_policy, True)
1096
        user.add_auth_provider('shibboleth', identifier='123456')
1097
        self.assertEqual(academic.get_add_policy, False)
1098
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1099

    
1100
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1101
    @shibboleth_settings(LIMIT_POLICY=2)
1102
    def test_messages(self):
1103
        user = get_local_user('kpap@synnefo.org')
1104
        user.add_auth_provider('shibboleth', identifier='1234')
1105
        user.add_auth_provider('shibboleth', identifier='12345')
1106
        provider = auth_providers.get_provider('shibboleth')
1107
        self.assertEqual(provider.get_message('title'), 'Academic')
1108
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1109
        # regenerate messages cache
1110
        provider = auth_providers.get_provider('shibboleth')
1111
        self.assertEqual(provider.get_message('title'), 'New title')
1112
        self.assertEqual(provider.get_message('login_title'),
1113
                         'New title LOGIN')
1114
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1115
        self.assertEqual(provider.get_module_icon,
1116
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1117
        self.assertEqual(provider.get_module_medium_icon,
1118
                         settings.MEDIA_URL +
1119
                         'im/auth/icons-medium/shibboleth.png')
1120

    
1121
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1122
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1123
        self.assertEqual(provider.get_method_details_msg,
1124
                         'Account: 12345')
1125
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1126
        self.assertEqual(provider.get_method_details_msg,
1127
                         'Account: 1234')
1128

    
1129
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1130
        self.assertEqual(provider.get_not_active_msg,
1131
                         "'Academic login' is disabled.")
1132

    
1133
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1134
    @shibboleth_settings(LIMIT_POLICY=2)
1135
    def test_templates(self):
1136
        user = get_local_user('kpap@synnefo.org')
1137
        user.add_auth_provider('shibboleth', identifier='1234')
1138
        user.add_auth_provider('shibboleth', identifier='12345')
1139

    
1140
        provider = auth_providers.get_provider('shibboleth')
1141
        self.assertEqual(provider.get_template('login'),
1142
                         'im/auth/shibboleth_login.html')
1143
        provider = auth_providers.get_provider('google')
1144
        self.assertEqual(provider.get_template('login'),
1145
                         'im/auth/generic_login.html')
1146

    
1147

    
1148
class TestActivationBackend(TestCase):
1149

    
1150
    def setUp(self):
1151
        # dummy call to pass through logging middleware
1152
        self.client.get(ui_url(''))
1153

    
1154
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1155
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1156
    def test_policies(self):
1157
        backend = activation_backends.get_backend()
1158

    
1159
        # email matches RE_USER_EMAIL_PATTERNS
1160
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1161
                               is_active=False, email_verified=False)
1162
        backend.handle_verification(user1, user1.verification_code)
1163
        self.assertEqual(user1.accepted_policy, 'email')
1164

    
1165
        # manually moderated
1166
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1167
                               is_active=False, email_verified=False)
1168

    
1169
        backend.handle_verification(user2, user2.verification_code)
1170
        self.assertEqual(user2.moderated, False)
1171
        backend.handle_moderation(user2)
1172
        self.assertEqual(user2.moderated, True)
1173
        self.assertEqual(user2.accepted_policy, 'manual')
1174

    
1175
        # autoaccept due to provider automoderate policy
1176
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1177
                               is_active=False, email_verified=False)
1178
        user3.auth_providers.all().delete()
1179
        user3.add_auth_provider('shibboleth', identifier='shib123')
1180
        backend.handle_verification(user3, user3.verification_code)
1181
        self.assertEqual(user3.moderated, True)
1182
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1183

    
1184
    @im_settings(MODERATION_ENABLED=False,
1185
                 MANAGERS=(('Manager',
1186
                            'manager@synnefo.org'),),
1187
                 HELPDESK=(('Helpdesk',
1188
                            'helpdesk@synnefo.org'),),
1189
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1190
    def test_without_moderation(self):
1191
        backend = activation_backends.get_backend()
1192
        form = backend.get_signup_form('local')
1193
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1194

    
1195
        user_data = {
1196
            'email': 'kpap@synnefo.org',
1197
            'first_name': 'Kostas Papas',
1198
            'password1': '123',
1199
            'password2': '123'
1200
        }
1201
        form = backend.get_signup_form('local', user_data)
1202
        user = form.save(commit=False)
1203
        form.store_user(user)
1204
        self.assertEqual(user.is_active, False)
1205
        self.assertEqual(user.email_verified, False)
1206

    
1207
        # step one, registration
1208
        result = backend.handle_registration(user)
1209
        user = AstakosUser.objects.get()
1210
        self.assertEqual(user.is_active, False)
1211
        self.assertEqual(user.email_verified, False)
1212
        self.assertTrue(user.verification_code)
1213
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1214
        backend.send_result_notifications(result, user)
1215
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1216
        self.assertEqual(len(mail.outbox), 1)
1217

    
1218
        # step two, verify email (automatically
1219
        # moderates/accepts user, since moderation is disabled)
1220
        user = AstakosUser.objects.get()
1221
        valid_code = user.verification_code
1222

    
1223
        # test invalid code
1224
        result = backend.handle_verification(user, valid_code)
1225
        backend.send_result_notifications(result, user)
1226
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1227
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1228
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1229
        # verification + activated + greeting = 3
1230
        self.assertEqual(len(mail.outbox), 3)
1231
        user = AstakosUser.objects.get()
1232
        self.assertEqual(user.is_active, True)
1233
        self.assertEqual(user.moderated, True)
1234
        self.assertTrue(user.moderated_at)
1235
        self.assertEqual(user.email_verified, True)
1236
        self.assertTrue(user.activation_sent)
1237

    
1238
    @im_settings(MODERATION_ENABLED=True,
1239
                 MANAGERS=(('Manager',
1240
                            'manager@synnefo.org'),),
1241
                 HELPDESK=(('Helpdesk',
1242
                            'helpdesk@synnefo.org'),),
1243
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1244
    def test_with_moderation(self):
1245

    
1246
        backend = activation_backends.get_backend()
1247
        form = backend.get_signup_form('local')
1248
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1249

    
1250
        user_data = {
1251
            'email': 'kpap@synnefo.org',
1252
            'first_name': 'Kostas Papas',
1253
            'password1': '123',
1254
            'password2': '123'
1255
        }
1256
        form = backend.get_signup_form(provider='local',
1257
                                       initial_data=user_data)
1258
        user = form.save(commit=False)
1259
        form.store_user(user)
1260
        self.assertEqual(user.is_active, False)
1261
        self.assertEqual(user.email_verified, False)
1262

    
1263
        # step one, registration
1264
        result = backend.handle_registration(user)
1265
        user = AstakosUser.objects.get()
1266
        self.assertEqual(user.is_active, False)
1267
        self.assertEqual(user.email_verified, False)
1268
        self.assertTrue(user.verification_code)
1269
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1270
        backend.send_result_notifications(result, user)
1271
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1272
        self.assertEqual(len(mail.outbox), 1)
1273

    
1274
        # step two, verifying email
1275
        user = AstakosUser.objects.get()
1276
        valid_code = user.verification_code
1277
        invalid_code = user.verification_code + 'invalid'
1278

    
1279
        # test invalid code
1280
        result = backend.handle_verification(user, invalid_code)
1281
        self.assertEqual(result.status, backend.Result.ERROR)
1282
        backend.send_result_notifications(result, user)
1283
        user = AstakosUser.objects.get()
1284
        self.assertEqual(user.is_active, False)
1285
        self.assertEqual(user.moderated, False)
1286
        self.assertEqual(user.moderated_at, None)
1287
        self.assertEqual(user.email_verified, False)
1288
        self.assertTrue(user.activation_sent)
1289

    
1290
        # test valid code
1291
        user = AstakosUser.objects.get()
1292
        result = backend.handle_verification(user, valid_code)
1293
        backend.send_result_notifications(result, user)
1294
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1295
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1296
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1297
        self.assertEqual(len(mail.outbox), 2)
1298
        user = AstakosUser.objects.get()
1299
        self.assertEqual(user.moderated, False)
1300
        self.assertEqual(user.moderated_at, None)
1301
        self.assertEqual(user.email_verified, True)
1302
        self.assertTrue(user.activation_sent)
1303

    
1304
        # test code reuse
1305
        result = backend.handle_verification(user, valid_code)
1306
        self.assertEqual(result.status, backend.Result.ERROR)
1307
        user = AstakosUser.objects.get()
1308
        self.assertEqual(user.is_active, False)
1309
        self.assertEqual(user.moderated, False)
1310
        self.assertEqual(user.moderated_at, None)
1311
        self.assertEqual(user.email_verified, True)
1312
        self.assertTrue(user.activation_sent)
1313

    
1314
        # valid code on verified user
1315
        user = AstakosUser.objects.get()
1316
        valid_code = user.verification_code
1317
        result = backend.handle_verification(user, valid_code)
1318
        self.assertEqual(result.status, backend.Result.ERROR)
1319

    
1320
        # step three, moderation user
1321
        user = AstakosUser.objects.get()
1322
        result = backend.handle_moderation(user)
1323
        backend.send_result_notifications(result, user)
1324

    
1325
        user = AstakosUser.objects.get()
1326
        self.assertEqual(user.is_active, True)
1327
        self.assertEqual(user.moderated, True)
1328
        self.assertTrue(user.moderated_at)
1329
        self.assertEqual(user.email_verified, True)
1330
        self.assertTrue(user.activation_sent)
1331

    
1332

    
1333
class TestWebloginRedirect(TestCase):
1334

    
1335
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1336
    def test_restricts_domains(self):
1337
        get_local_user('user1@synnefo.org')
1338

    
1339
        # next url construct helpers
1340
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1341
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1342
            urllib.quote_plus(nxt)
1343

    
1344
        # common cases
1345
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1346
        invalid_scheme = weblogin("customscheme://localhost")
1347
        invalid_scheme_with_valid_domain = \
1348
            weblogin("http://www.invaliddomain.com")
1349
        valid_scheme = weblogin("pithos://localhost/")
1350
        # to be used in assertRedirects
1351
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1352

    
1353
        # not authenticated, redirects to login which contains next param with
1354
        # additional nested quoted next params
1355
        r = self.client.get(valid_scheme, follow=True)
1356
        login_redirect = reverse('login') + '?next=' + \
1357
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1358
        self.assertRedirects(r, login_redirect)
1359

    
1360
        # authenticate client
1361
        self.client.login(username="user1@synnefo.org", password="password")
1362

    
1363
        # valid scheme
1364
        r = self.client.get(valid_scheme, follow=True)
1365
        url = r.redirect_chain[1][0]
1366
        # scheme preserved
1367
        self.assertTrue(url.startswith('pithos://localhost/'))
1368
        # redirect contains token param
1369
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1370
                                   scheme='https').query
1371
        params = urlparse.parse_qs(params)
1372
        self.assertEqual(params['token'][0],
1373
                         AstakosUser.objects.get().auth_token)
1374
        # does not contain uuid
1375
        # reverted for 0.14.2 to support old pithos desktop clients
1376
        #self.assertFalse('uuid' in params)
1377

    
1378
        # invalid cases
1379
        r = self.client.get(invalid_scheme, follow=True)
1380
        self.assertEqual(r.status_code, 403)
1381

    
1382
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1383
        self.assertEqual(r.status_code, 403)
1384

    
1385
        r = self.client.get(invalid_domain, follow=True)
1386
        self.assertEqual(r.status_code, 403)