Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ b08aadc0

History | View | Annotate | Download (63.8 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
import urlparse
36
import urllib
37

    
38
from astakos.im.tests.common import *
39

    
40
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
41

    
42

    
43
class ShibbolethTests(TestCase):
44
    """
45
    Testing shibboleth authentication.
46
    """
47

    
48
    def setUp(self):
49
        self.client = ShibbolethClient()
50
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
51
        astakos_settings.MODERATION_ENABLED = True
52

    
53
    def tearDown(self):
54
        AstakosUser.objects.all().delete()
55

    
56
    @im_settings(FORCE_PROFILE_UPDATE=False)
57
    def test_create_account(self):
58

    
59
        client = ShibbolethClient()
60

    
61
        # shibboleth views validation
62
        # eepn required
63
        r = client.get(ui_url('login/shibboleth?'), follow=True)
64
        self.assertContains(r, messages.SHIBBOLETH_MISSING_USER_ID % {
65
            'domain': astakos_settings.BASE_HOST,
66
            'contact_email': settings.CONTACT_EMAIL
67
        })
68
        client.set_tokens(remote_user="kpapeppn", eppn="kpapeppn")
69

    
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
71
        # shibboleth user info required
72
        r = client.get(ui_url('login/shibboleth?'), follow=True)
73
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
74
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
75

    
76
        # shibboleth logged us in
77
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
78
                          cn="Kostas Papadimitriou",
79
                          ep_affiliation="Test Affiliation")
80
        r = client.get(ui_url('login/shibboleth?'), follow=True,
81
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
82
        token = PendingThirdPartyUser.objects.get().token
83
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
84
        self.assertEqual(r.status_code, 200)
85

    
86
        # a new pending user created
87
        pending_user = PendingThirdPartyUser.objects.get(
88
            third_party_identifier="kpapeppn")
89
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
90
        # keep the token for future use
91
        token = pending_user.token
92
        # from now on no shibboleth headers are sent to the server
93
        client.reset_tokens()
94

    
95
        # this is the old way, it should fail, to avoid pending user take over
96
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
97
        self.assertEqual(r.status_code, 404)
98

    
99
        # this is the signup unique url associated with the pending user
100
        # created
101
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
102
        identifier = pending_user.third_party_identifier
103
        post_data = {'third_party_identifier': identifier,
104
                     'first_name': 'Kostas',
105
                     'third_party_token': token,
106
                     'last_name': 'Mitroglou',
107
                     'provider': 'shibboleth'}
108

    
109
        signup_url = reverse('signup')
110

    
111
        # invlid email
112
        post_data['email'] = 'kpap'
113
        r = client.post(signup_url, post_data)
114
        self.assertContains(r, token)
115

    
116
        # existing email
117
        existing_user = get_local_user('test@test.com')
118
        post_data['email'] = 'test@test.com'
119
        r = client.post(signup_url, post_data)
120
        self.assertContains(r, messages.EMAIL_USED)
121
        existing_user.delete()
122

    
123
        # and finally a valid signup
124
        post_data['email'] = 'kpap@synnefo.org'
125
        r = client.post(signup_url, post_data, follow=True)
126
        self.assertContains(r, messages.VERIFICATION_SENT)
127

    
128
        # entires commited as expected
129
        self.assertEqual(AstakosUser.objects.count(), 1)
130
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
131
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
132

    
133
        user = AstakosUser.objects.get()
134
        provider = user.get_auth_provider("shibboleth")
135
        headers = provider.provider_details['info']['headers']
136
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
137

    
138
        # provider info stored
139
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
140
        self.assertEqual(provider.affiliation, 'Test Affiliation')
141
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
142
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
143
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
144
        self.assertTrue('headers' in provider.info)
145

    
146
        # login (not activated yet)
147
        client.set_tokens(mail="kpap@synnefo.org", remote_user="kpapeppn",
148
                          cn="Kostas Papadimitriou")
149
        r = client.get(ui_url("login/shibboleth?"), follow=True)
150
        self.assertContains(r, 'is pending moderation')
151

    
152
        # admin activates the user
153
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
154
        backend = activation_backends.get_backend()
155
        activation_result = backend.verify_user(u, u.verification_code)
156
        activation_result = backend.accept_user(u)
157
        self.assertFalse(activation_result.is_error())
158
        backend.send_result_notifications(activation_result, u)
159
        self.assertEqual(u.is_active, True)
160

    
161
        # we see our profile
162
        r = client.get(ui_url("login/shibboleth?"), follow=True)
163
        self.assertRedirects(r, ui_url('landing'))
164
        self.assertEqual(r.status_code, 200)
165

    
166
    def test_existing(self):
167
        """
168
        Test adding of third party login to an existing account
169
        """
170

    
171
        # this is our existing user
172
        existing_user = get_local_user('kpap@synnefo.org')
173
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
174
        existing_inactive.is_active = False
175
        existing_inactive.save()
176

    
177
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
178
        existing_unverified.is_active = False
179
        existing_unverified.activation_sent = None
180
        existing_unverified.email_verified = False
181
        existing_unverified.is_verified = False
182
        existing_unverified.save()
183

    
184
        client = ShibbolethClient()
185
        # shibboleth logged us in, notice that we use different email
186
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
187
                          cn="Kostas Papadimitriou", )
188
        r = client.get(ui_url("login/shibboleth?"), follow=True)
189

    
190
        # a new pending user created
191
        pending_user = PendingThirdPartyUser.objects.get()
192
        token = pending_user.token
193
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
194
        pending_key = pending_user.token
195
        client.reset_tokens()
196
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
197

    
198
        form = r.context['login_form']
199
        signupdata = copy.copy(form.initial)
200
        signupdata['email'] = 'kpap@synnefo.org'
201
        signupdata['third_party_token'] = token
202
        signupdata['provider'] = 'shibboleth'
203
        signupdata.pop('id', None)
204

    
205
        # the email exists to another user
206
        r = client.post(ui_url("signup"), signupdata)
207
        self.assertContains(r, "There is already an account with this email "
208
                               "address")
209
        # change the case, still cannot create
210
        signupdata['email'] = 'KPAP@synnefo.org'
211
        r = client.post(ui_url("signup"), signupdata)
212
        self.assertContains(r, "There is already an account with this email "
213
                               "address")
214
        # inactive user
215
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
216
        r = client.post(ui_url("signup"), signupdata)
217
        self.assertContains(r, "There is already an account with this email "
218
                               "address")
219

    
220
        # unverified user, this should pass, old entry will be deleted
221
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
222
        r = client.post(ui_url("signup"), signupdata)
223

    
224
        post_data = {'password': 'password',
225
                     'username': 'kpap@synnefo.org'}
226
        r = client.post(ui_url('local'), post_data, follow=True)
227
        self.assertTrue(r.context['request'].user.is_authenticated())
228
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
229
                          cn="Kostas Papadimitriou", )
230
        r = client.get(ui_url("login/shibboleth?"), follow=True)
231
        self.assertContains(r, "enabled for this account")
232
        client.reset_tokens()
233

    
234
        user = existing_user
235
        self.assertTrue(user.has_auth_provider('shibboleth'))
236
        self.assertTrue(user.has_auth_provider('local',
237
                                               auth_backend='astakos'))
238
        client.logout()
239

    
240
        # look Ma, i can login with both my shibboleth and local account
241
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
242
                          cn="Kostas Papadimitriou")
243
        r = client.get(ui_url("login/shibboleth?"), follow=True)
244
        self.assertTrue(r.context['request'].user.is_authenticated())
245
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
246
        self.assertRedirects(r, ui_url('landing'))
247
        self.assertEqual(r.status_code, 200)
248

    
249
        user = r.context['request'].user
250
        client.logout()
251
        client.reset_tokens()
252

    
253
        # logged out
254
        r = client.get(ui_url("profile"), follow=True)
255
        self.assertFalse(r.context['request'].user.is_authenticated())
256

    
257
        # login with local account also works
258
        post_data = {'password': 'password',
259
                     'username': 'kpap@synnefo.org'}
260
        r = self.client.post(ui_url('local'), post_data, follow=True)
261
        self.assertTrue(r.context['request'].user.is_authenticated())
262
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
263
        self.assertRedirects(r, ui_url('landing'))
264
        self.assertEqual(r.status_code, 200)
265

    
266
        # cannot add the same eppn
267
        client.set_tokens(mail="secondary@shibboleth.gr",
268
                          remote_user="kpapeppn",
269
                          cn="Kostas Papadimitriou", )
270
        r = client.get(ui_url("login/shibboleth?"), follow=True)
271
        self.assertRedirects(r, ui_url('landing'))
272
        self.assertTrue(r.status_code, 200)
273
        self.assertEquals(existing_user.auth_providers.count(), 2)
274

    
275
        # only one allowed by default
276
        client.set_tokens(mail="secondary@shibboleth.gr",
277
                          remote_user="kpapeppn2",
278
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
279
        prov = auth_providers.get_provider('shibboleth')
280
        r = client.get(ui_url("login/shibboleth?"), follow=True)
281
        self.assertContains(r, "Failed to add")
282
        self.assertRedirects(r, ui_url('profile'))
283
        self.assertTrue(r.status_code, 200)
284
        self.assertEquals(existing_user.auth_providers.count(), 2)
285
        client.logout()
286
        client.reset_tokens()
287

    
288
        # cannot login with another eppn
289
        client.set_tokens(mail="kpap@synnefo.org",
290
                          remote_user="kpapeppninvalid",
291
                          cn="Kostas Papadimitriou")
292
        r = client.get(ui_url("login/shibboleth?"), follow=True)
293
        self.assertFalse(r.context['request'].user.is_authenticated())
294

    
295
        # cannot
296

    
297
        # lets remove local password
298
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
299
                                       email="kpap@synnefo.org")
300
        remove_local_url = user.get_auth_provider('local').get_remove_url
301
        remove_shibbo_url = user.get_auth_provider('shibboleth',
302
                                                   'kpapeppn').get_remove_url
303
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
304
                          cn="Kostas Papadimtriou")
305
        r = client.get(ui_url("login/shibboleth?"), follow=True)
306
        client.reset_tokens()
307

    
308
        # only POST is allowed (for CSRF protection)
309
        r = client.get(remove_local_url, follow=True)
310
        self.assertEqual(r.status_code, 405)
311

    
312
        r = client.post(remove_local_url, follow=True)
313
        # 2 providers left
314
        self.assertEqual(user.auth_providers.count(), 1)
315
        # cannot remove last provider
316
        r = client.post(remove_shibbo_url)
317
        self.assertEqual(r.status_code, 403)
318
        self.client.logout()
319

    
320
        # cannot login using local credentials (notice we use another client)
321
        post_data = {'password': 'password',
322
                     'username': 'kpap@synnefo.org'}
323
        r = self.client.post(ui_url('local'), post_data, follow=True)
324
        self.assertFalse(r.context['request'].user.is_authenticated())
325

    
326
        # we can reenable the local provider by setting a password
327
        r = client.get(ui_url("password_change"), follow=True)
328
        r = client.post(ui_url("password_change"), {'new_password1': '111',
329
                                                    'new_password2': '111'},
330
                        follow=True)
331
        user = r.context['request'].user
332
        self.assertTrue(user.has_auth_provider('local'))
333
        self.assertTrue(user.has_auth_provider('shibboleth'))
334
        self.assertTrue(user.check_password('111'))
335
        self.assertTrue(user.has_usable_password())
336

    
337
        # change password via profile form
338
        r = client.post(ui_url("profile"), {
339
            'old_password': '111',
340
            'new_password': '',
341
            'new_password2': '',
342
            'change_password': 'on',
343
        }, follow=False)
344
        self.assertEqual(r.status_code, 200)
345
        self.assertFalse(r.context['profile_form'].is_valid())
346

    
347
        self.client.logout()
348

    
349
        # now we can login
350
        post_data = {'password': '111',
351
                     'username': 'kpap@synnefo.org'}
352
        r = self.client.post(ui_url('local'), post_data, follow=True)
353
        self.assertTrue(r.context['request'].user.is_authenticated())
354

    
355
        client.reset_tokens()
356

    
357
        # we cannot take over another shibboleth identifier
358
        user2 = get_local_user('another@synnefo.org')
359
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
360
        # login
361
        client.set_tokens(mail="kpap@shibboleth.gr", remote_user="kpapeppn",
362
                          cn="Kostas Papadimitriou")
363
        r = client.get(ui_url("login/shibboleth?"), follow=True)
364
        # try to assign existing shibboleth identifier of another user
365
        client.set_tokens(mail="kpap_second@shibboleth.gr",
366
                          remote_user="existingeppn",
367
                          cn="Kostas Papadimitriou")
368
        r = client.get(ui_url("login/shibboleth?"), follow=True)
369
        self.assertContains(r, "is already in use")
370

    
371

    
372
class TestLocal(TestCase):
373

    
374
    def setUp(self):
375
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
376
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
377
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
378
        settings.ASTAKOS_MODERATION_ENABLED = True
379

    
380
    def tearDown(self):
381
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
382
        AstakosUser.objects.all().delete()
383

    
384
    @im_settings(RECAPTCHA_ENABLED=True, RATELIMIT_RETRIES_ALLOWED=3)
385
    def test_login_ratelimit(self):
386
        from django.core.cache import cache
387
        [cache.delete(key) for key in cache._cache.keys()]
388

    
389
        credentials = {'username': 'γιού τι έφ', 'password': 'password'}
390
        r = self.client.post(ui_url('local'), credentials, follow=True)
391
        fields = r.context['login_form'].fields.keyOrder
392
        self.assertFalse('recaptcha_challenge_field' in fields)
393
        r = self.client.post(ui_url('local'), credentials, follow=True)
394
        fields = r.context['login_form'].fields.keyOrder
395
        self.assertFalse('recaptcha_challenge_field' in fields)
396
        r = self.client.post(ui_url('local'), credentials, follow=True)
397
        fields = r.context['login_form'].fields.keyOrder
398
        self.assertTrue('recaptcha_challenge_field' in fields)
399

    
400
    def test_no_moderation(self):
401
        # disable moderation
402
        astakos_settings.MODERATION_ENABLED = False
403

    
404
        # create a new user
405
        r = self.client.get(ui_url("signup"))
406
        self.assertEqual(r.status_code, 200)
407
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
408
                'password2': 'password', 'first_name': 'Kostas',
409
                'last_name': 'Mitroglou', 'provider': 'local'}
410
        r = self.client.post(ui_url("signup"), data)
411

    
412
        # user created
413
        self.assertEqual(AstakosUser.objects.count(), 1)
414
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
415
                                       email="kpap@synnefo.org")
416
        self.assertEqual(user.username, 'kpap@synnefo.org')
417
        self.assertEqual(user.has_auth_provider('local'), True)
418
        self.assertFalse(user.is_active)
419

    
420
        # user (but not admin) gets notified
421
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
422
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
423
        astakos_settings.MODERATION_ENABLED = True
424

    
425
    def test_email_case(self):
426
        data = {
427
            'email': 'kPap@synnefo.org',
428
            'password1': '1234',
429
            'password2': '1234'
430
        }
431

    
432
        form = forms.LocalUserCreationForm(data)
433
        self.assertTrue(form.is_valid())
434
        user = form.create_user()
435

    
436
        u = AstakosUser.objects.get()
437
        self.assertEqual(u.email, 'kPap@synnefo.org')
438
        self.assertEqual(u.username, 'kpap@synnefo.org')
439
        u.is_active = True
440
        u.email_verified = True
441
        u.save()
442

    
443
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
444
        login = forms.LoginForm(data=data)
445
        self.assertTrue(login.is_valid())
446

    
447
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
448
        login = forms.LoginForm(data=data)
449
        self.assertTrue(login.is_valid())
450

    
451
        data = {
452
            'email': 'kpap@synnefo.org',
453
            'password1': '1234',
454
            'password2': '1234'
455
        }
456
        form = forms.LocalUserCreationForm(data)
457
        self.assertFalse(form.is_valid())
458

    
459
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
460
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
461
    def test_local_provider(self):
462
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
463

    
464
        # create a user
465
        r = self.client.get(ui_url("signup"))
466
        self.assertEqual(r.status_code, 200)
467
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
468
                'password2': 'password', 'first_name': 'Kostas',
469
                'last_name': 'Mitroglou', 'provider': 'local'}
470
        r = self.client.post(ui_url("signup"), data)
471

    
472
        # user created
473
        self.assertEqual(AstakosUser.objects.count(), 1)
474
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
475
                                       email="kpap@synnefo.org")
476
        self.assertEqual(user.username, 'kpap@synnefo.org')
477
        self.assertEqual(user.has_auth_provider('local'), True)
478
        self.assertFalse(user.is_active)  # not activated
479
        self.assertFalse(user.email_verified)  # not verified
480
        self.assertTrue(user.activation_sent)  # activation automatically sent
481
        self.assertFalse(user.moderated)
482
        self.assertFalse(user.email_verified)
483

    
484
        # admin gets notified and activates the user from the command line
485
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
486
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
487
                                               'password': 'password'},
488
                             follow=True)
489
        self.assertContains(r, messages.VERIFICATION_SENT)
490
        backend = activation_backends.get_backend()
491

    
492
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
493
        backend.send_user_verification_email(user)
494

    
495
        # user activation fields updated and user gets notified via email
496
        user = AstakosUser.objects.get(pk=user.pk)
497
        self.assertTrue(user.activation_sent)
498
        self.assertFalse(user.email_verified)
499
        self.assertFalse(user.is_active)
500
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
501

    
502
        # user forgot she got registered and tries to submit registration
503
        # form. Notice the upper case in email
504
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
505
                'password2': 'password', 'first_name': 'Kostas',
506
                'last_name': 'Mitroglou', 'provider': 'local'}
507
        r = self.client.post(ui_url("signup"), data, follow=True)
508
        self.assertRedirects(r, reverse('login'))
509
        self.assertContains(r, messages.VERIFICATION_SENT)
510

    
511
        user = AstakosUser.objects.get()
512
        # previous user replaced
513
        self.assertTrue(user.activation_sent)
514
        self.assertFalse(user.email_verified)
515
        self.assertFalse(user.is_active)
516
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
517

    
518
        # hmmm, email exists; lets request a password change
519
        r = self.client.get(ui_url('local/password_reset'))
520
        self.assertEqual(r.status_code, 200)
521
        data = {'email': 'kpap@synnefo.org'}
522
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
523
        # she can't because account is not active yet
524
        self.assertContains(r, 'pending activation')
525

    
526
        # moderation is enabled and an activation email has already been sent
527
        # so user can trigger resend of the activation email
528
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
529
                            follow=True)
530
        self.assertContains(r, 'has been sent to your email address.')
531
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
532

    
533
        # also she cannot login
534
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
535
        r = self.client.post(ui_url('local'), data, follow=True)
536
        self.assertContains(r, 'Resend activation')
537
        self.assertFalse(r.context['request'].user.is_authenticated())
538
        self.assertFalse('_pithos2_a' in self.client.cookies)
539

    
540
        # user sees the message and resends activation
541
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
542
                            follow=True)
543
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
544

    
545
        # logged in user cannot activate another account
546
        tmp_user = get_local_user("test_existing_user@synnefo.org")
547
        tmp_client = Client()
548
        tmp_client.login(username="test_existing_user@synnefo.org",
549
                         password="password")
550
        r = tmp_client.get(user.get_activation_url(), follow=True)
551
        self.assertContains(r, messages.LOGGED_IN_WARNING)
552

    
553
        # empty activation code is not allowed
554
        r = self.client.get(user.get_activation_url().split("?")[0],
555
                            follow=True)
556
        self.assertEqual(r.status_code, 403)
557

    
558
        r = self.client.get(user.get_activation_url(), follow=True)
559
        # previous code got invalidated
560
        self.assertEqual(r.status_code, 404)
561

    
562
        user = AstakosUser.objects.get(pk=user.pk)
563
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
564
        r = self.client.get(user.get_activation_url(), follow=True)
565
        self.assertRedirects(r, reverse('login'))
566
        # user sees that account is pending approval from admins
567
        self.assertContains(r, messages.NOTIFICATION_SENT)
568
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
569

    
570
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
571
        result = backend.handle_moderation(user)
572
        backend.send_result_notifications(result, user)
573
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
574
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
575

    
576
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
577
        r = self.client.get(ui_url('profile'), follow=True)
578
        self.assertFalse(r.context['request'].user.is_authenticated())
579
        self.assertFalse('_pithos2_a' in self.client.cookies)
580
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
581

    
582
        user = AstakosUser.objects.get(pk=user.pk)
583
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
584
                                               'password': 'password'},
585
                             follow=True)
586
        # user activated and logged in, token cookie set
587
        self.assertTrue(r.context['request'].user.is_authenticated())
588
        self.assertTrue('_pithos2_a' in self.client.cookies)
589
        cookies = self.client.cookies
590
        self.assertTrue(quote(user.auth_token) in
591
                        cookies.get('_pithos2_a').value)
592
        r = self.client.get(ui_url('logout'), follow=True)
593
        r = self.client.get(ui_url(''), follow=True)
594
        self.assertRedirects(r, ui_url('login'))
595
        # user logged out, token cookie removed
596
        self.assertFalse(r.context['request'].user.is_authenticated())
597
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
598

    
599
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
600
        del self.client.cookies['_pithos2_a']
601

    
602
        # user can login
603
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
604
                                               'password': 'password'},
605
                             follow=True)
606
        self.assertTrue(r.context['request'].user.is_authenticated())
607
        self.assertTrue('_pithos2_a' in self.client.cookies)
608
        cookies = self.client.cookies
609
        self.assertTrue(quote(user.auth_token) in
610
                        cookies.get('_pithos2_a').value)
611
        self.client.get(ui_url('logout'), follow=True)
612

    
613
        # user forgot password
614
        old_pass = user.password
615
        r = self.client.get(ui_url('local/password_reset'))
616
        self.assertEqual(r.status_code, 200)
617
        r = self.client.post(ui_url('local/password_reset'),
618
                             {'email': 'kpap@synnefo.org'})
619
        self.assertEqual(r.status_code, 302)
620
        # email sent
621
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
622

    
623
        # user visits change password link
624
        user = AstakosUser.objects.get(pk=user.pk)
625
        r = self.client.get(user.get_password_reset_url())
626
        r = self.client.post(user.get_password_reset_url(),
627
                             {'new_password1': 'newpass',
628
                              'new_password2': 'newpass'})
629

    
630
        user = AstakosUser.objects.get(pk=user.pk)
631
        self.assertNotEqual(old_pass, user.password)
632

    
633
        # old pass is not usable
634
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
635
                                               'password': 'password'})
636
        self.assertContains(r, 'Please enter a correct username and password')
637
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
638
                                               'password': 'newpass'},
639
                             follow=True)
640
        self.assertTrue(r.context['request'].user.is_authenticated())
641
        self.client.logout()
642

    
643
        # tests of special local backends
644
        user = AstakosUser.objects.get(pk=user.pk)
645
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
646
        user.save()
647

    
648
        # non astakos local backends do not support password reset
649
        r = self.client.get(ui_url('local/password_reset'))
650
        self.assertEqual(r.status_code, 200)
651
        r = self.client.post(ui_url('local/password_reset'),
652
                             {'email': 'kpap@synnefo.org'})
653
        # she can't because account is not active yet
654
        self.assertContains(r, "Changing password is not")
655

    
656
    def test_fix_superuser(self):
657
        u = User.objects.create(username="dummy", email="email@example.org",
658
                                first_name="Super", last_name="User",
659
                                is_superuser=True)
660
        User.objects.create(username="dummy2", email="email2@example.org",
661
                            first_name="Other", last_name="User")
662
        fixed = auth_functions.fix_superusers()
663
        self.assertEqual(len(fixed), 1)
664
        fuser = fixed[0]
665
        self.assertEqual(fuser.email, fuser.username)
666

    
667

    
668
class UserActionsTests(TestCase):
669

    
670
    def test_email_change(self):
671
        # to test existing email validation
672
        get_local_user('existing@synnefo.org')
673

    
674
        # local user
675
        user = get_local_user('kpap@synnefo.org')
676

    
677
        # login as kpap
678
        self.client.login(username='kpap@synnefo.org', password='password')
679
        r = self.client.get(ui_url('profile'), follow=True)
680
        user = r.context['request'].user
681
        self.assertTrue(user.is_authenticated())
682

    
683
        # change email is enabled
684
        r = self.client.get(ui_url('email_change'))
685
        self.assertEqual(r.status_code, 200)
686
        self.assertFalse(user.email_change_is_pending())
687

    
688
        # request email change to an existing email fails
689
        data = {'new_email_address': 'existing@synnefo.org'}
690
        r = self.client.post(ui_url('email_change'), data)
691
        self.assertContains(r, messages.EMAIL_USED)
692

    
693
        # proper email change
694
        data = {'new_email_address': 'kpap@gmail.com'}
695
        r = self.client.post(ui_url('email_change'), data, follow=True)
696
        self.assertRedirects(r, ui_url('profile'))
697
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
698
        change1 = EmailChange.objects.get()
699

    
700
        # user sees a warning
701
        r = self.client.get(ui_url('email_change'))
702
        self.assertEqual(r.status_code, 200)
703
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
704
        self.assertTrue(user.email_change_is_pending())
705

    
706
        # link was sent
707
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
708
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
709

    
710
        # proper email change
711
        data = {'new_email_address': 'kpap@yahoo.com'}
712
        r = self.client.post(ui_url('email_change'), data, follow=True)
713
        self.assertRedirects(r, ui_url('profile'))
714
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
715
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
716
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
717
        change2 = EmailChange.objects.get()
718

    
719
        r = self.client.get(change1.get_url())
720
        self.assertEquals(r.status_code, 404)
721
        self.client.logout()
722

    
723
        invalid_client = Client()
724
        r = invalid_client.post(ui_url('local?'),
725
                                {'username': 'existing@synnefo.org',
726
                                 'password': 'password'})
727
        r = invalid_client.get(change2.get_url(), follow=True)
728
        self.assertEquals(r.status_code, 403)
729

    
730
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
731
                             {'username': 'kpap@synnefo.org',
732
                              'password': 'password',
733
                              'next': change2.get_url()},
734
                             follow=True)
735
        self.assertRedirects(r, ui_url('profile'))
736
        user = r.context['request'].user
737
        self.assertEquals(user.email, 'kpap@yahoo.com')
738
        self.assertEquals(user.username, 'kpap@yahoo.com')
739

    
740
        self.client.logout()
741
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
742
                             {'username': 'kpap@synnefo.org',
743
                              'password': 'password',
744
                              'next': change2.get_url()},
745
                             follow=True)
746
        self.assertContains(r, "Please enter a correct username and password")
747
        self.assertEqual(user.emailchanges.count(), 0)
748

    
749
        AstakosUser.objects.all().delete()
750
        Group.objects.all().delete()
751

    
752

    
753
TEST_TARGETED_ID1 = \
754
    "https://idp.synnefo.org/idp/shibboleth!ZWxhIHJlIGVsYSByZSBlbGEgcmU="
755
TEST_TARGETED_ID2 = \
756
    "https://idp.synnefo.org/idp/shibboleth!ZGUgc2UgeGFsYXNlLi4uLi4uLg=="
757
TEST_TARGETED_ID3 = \
758
    "https://idp.synnefo.org/idp/shibboleth!"
759

    
760

    
761
class TestAuthProviderViews(TestCase):
762

    
763
    def tearDown(self):
764
        AstakosUser.objects.all().delete()
765

    
766
    @im_settings(IM_MODULES=['shibboleth'], MODERATION_ENABLED=False,
767
                 SHIBBOLETH_MIGRATE_EPPN=True)
768
    def migrate_to_remote_id(self):
769
        eppn_user = get_local_user("eppn@synnefo.org")
770
        tid_user = get_local_user("tid@synnefo.org")
771
        eppn_user.add_auth_provider('shibboleth', 'EPPN')
772
        tid_user.add_auth_provider('shibboleth', TEST_TARGETED_ID1)
773

    
774
        get_user = lambda r: r.context['request'].user
775

    
776
        client = ShibbolethClient()
777
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID2)
778
        r = client.get(ui_url('login/shibboleth?'), follow=True)
779
        self.assertTrue(get_user(r).is_authenticated())
780
        self.assertEqual(eppn_user.get_auth_provider('shibboleth').identifier,
781
                         TEST_TARGETED_ID2)
782

    
783
        client = ShibbolethClient()
784
        client.set_tokens(eppn="EPPN", remote_user=TEST_TARGETED_ID1)
785
        r = client.get(ui_url('login/shibboleth?'), follow=True)
786
        self.assertTrue(get_user(r).is_authenticated())
787
        self.assertEqual(tid_user.get_auth_provider('shibboleth').identifier,
788
                         TEST_TARGETED_ID1)
789

    
790
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
791
                         AUTOMODERATE_POLICY=True)
792
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
793
                 FORCE_PROFILE_UPDATE=False)
794
    def test_user(self):
795
        Profile = AuthProviderPolicyProfile
796
        Pending = PendingThirdPartyUser
797
        User = AstakosUser
798

    
799
        auth_functions.make_user("newuser@synnefo.org")
800
        get_local_user("olduser@synnefo.org")
801
        cl_olduser = ShibbolethClient()
802
        get_local_user("olduser2@synnefo.org")
803
        ShibbolethClient()
804
        cl_newuser = ShibbolethClient()
805
        cl_newuser2 = Client()
806

    
807
        academic_group, created = Group.objects.get_or_create(
808
            name='academic-login')
809
        academic_users = academic_group.user_set
810
        assert created
811
        policy_only_academic = Profile.objects.add_policy('academic_strict',
812
                                                          'shibboleth',
813
                                                          academic_group,
814
                                                          exclusive=True,
815
                                                          login=False,
816
                                                          add=False)
817

    
818
        # new academic user
819
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
820
        cl_newuser.set_tokens(remote_user="newusereppn", mail="newuser@synnefo.org",
821
                              surname="Lastname")
822
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
823
        initial = r.context['signup_form'].initial
824
        pending = Pending.objects.get()
825
        self.assertEqual(initial.get('last_name'), 'Lastname')
826
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
827
        identifier = pending.third_party_identifier
828
        signup_data = {'third_party_identifier': identifier,
829
                       'first_name': 'Academic',
830
                       'third_party_token': pending.token,
831
                       'last_name': 'New User',
832
                       'provider': 'shibboleth'}
833
        r = cl_newuser.post(ui_url('signup'), signup_data)
834
        self.assertContains(r, "This field is required", )
835
        signup_data['email'] = 'olduser@synnefo.org'
836
        r = cl_newuser.post(ui_url('signup'), signup_data)
837
        self.assertContains(r, "already an account with this email", )
838
        signup_data['email'] = 'newuser@synnefo.org'
839
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
840
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
841
        self.assertEqual(r.status_code, 404)
842
        newuser = User.objects.get(email="newuser@synnefo.org")
843
        activation_link = newuser.get_activation_url()
844
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
845

    
846
        # new non-academic user
847
        signup_data = {'first_name': 'Non Academic',
848
                       'last_name': 'New User',
849
                       'provider': 'local',
850
                       'password1': 'password',
851
                       'password2': 'password'}
852
        signup_data['email'] = 'olduser@synnefo.org'
853
        r = cl_newuser2.post(ui_url('signup'), signup_data)
854
        self.assertContains(r, 'There is already an account with this '
855
                               'email address')
856
        signup_data['email'] = 'newuser@synnefo.org'
857
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
858
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
859
        r = self.client.get(activation_link, follow=True)
860
        self.assertEqual(r.status_code, 404)
861
        newuser = User.objects.get(email="newuser@synnefo.org")
862
        self.assertTrue(newuser.activation_sent)
863

    
864
        # activation sent, user didn't open verification url so additional
865
        # registrations invalidate the previous signups.
866
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
867
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
868
        pending = Pending.objects.get()
869
        identifier = pending.third_party_identifier
870
        signup_data = {'third_party_identifier': identifier,
871
                       'first_name': 'Academic',
872
                       'third_party_token': pending.token,
873
                       'last_name': 'New User',
874
                       'provider': 'shibboleth'}
875
        signup_data['email'] = 'newuser@synnefo.org'
876
        r = cl_newuser.post(ui_url('signup'), signup_data)
877
        self.assertEqual(r.status_code, 302)
878
        newuser = User.objects.get(email="newuser@synnefo.org")
879
        self.assertTrue(newuser.activation_sent)
880
        activation_link = newuser.get_activation_url()
881
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
882
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
883
        self.assertRedirects(r, ui_url('landing'))
884
        newuser = User.objects.get(email="newuser@synnefo.org")
885
        self.assertEqual(newuser.is_active, True)
886
        self.assertEqual(newuser.email_verified, True)
887
        cl_newuser.logout()
888

    
889
        # cannot reactivate if suspended
890
        newuser.is_active = False
891
        newuser.save()
892
        r = cl_newuser.get(newuser.get_activation_url())
893
        newuser = User.objects.get(email="newuser@synnefo.org")
894
        self.assertFalse(newuser.is_active)
895

    
896
        # release suspension
897
        newuser.is_active = True
898
        newuser.save()
899

    
900
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
901
        local = auth.get_provider('local', newuser)
902
        self.assertEqual(local.get_add_policy, False)
903
        self.assertEqual(local.get_login_policy, False)
904
        r = cl_newuser.get(local.get_add_url, follow=True)
905
        self.assertRedirects(r, ui_url('profile'))
906
        self.assertContains(r, 'disabled for your')
907

    
908
        cl_olduser.login(username='olduser@synnefo.org', password="password")
909
        r = cl_olduser.get(ui_url('profile'), follow=True)
910
        self.assertEqual(r.status_code, 200)
911
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
912
        self.assertContains(r, 'Your request is missing a unique token')
913
        cl_olduser.set_tokens(remote_user="newusereppn")
914
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
915
        self.assertContains(r, 'already in use')
916
        cl_olduser.set_tokens(remote_user="oldusereppn")
917
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
918
        self.assertContains(r, 'Academic login enabled for this account')
919

    
920
        user = User.objects.get(email="olduser@synnefo.org")
921
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
922
        local_provider = user.get_auth_provider('local')
923
        self.assertEqual(shib_provider.get_remove_policy, True)
924
        self.assertEqual(local_provider.get_remove_policy, True)
925

    
926
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
927
                                                          'shibboleth',
928
                                                          academic_group,
929
                                                          remove=False)
930
        user.groups.add(academic_group)
931
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
932
        local_provider = user.get_auth_provider('local')
933
        self.assertEqual(shib_provider.get_remove_policy, False)
934
        self.assertEqual(local_provider.get_remove_policy, True)
935
        self.assertEqual(local_provider.get_login_policy, False)
936

    
937
        cl_olduser.logout()
938
        login_data = {'username': 'olduser@synnefo.org',
939
                      'password': 'password'}
940
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
941
        self.assertContains(r, "login/shibboleth'>Academic login")
942
        Group.objects.all().delete()
943

    
944

    
945
class TestAuthProvidersAPI(TestCase):
946
    """
947
    Test auth_providers module API
948
    """
949

    
950
    def tearDown(self):
951
        Group.objects.all().delete()
952

    
953
    @im_settings(IM_MODULES=['local', 'shibboleth'])
954
    def test_create(self):
955
        user = auth_functions.make_user(email="kpap@synnefo.org")
956
        user2 = auth_functions.make_user(email="kpap2@synnefo.org")
957

    
958
        module = 'shibboleth'
959
        identifier = 'SHIB_UUID'
960
        provider_params = {
961
            'affiliation': 'UNIVERSITY',
962
            'info': {'age': 27}
963
        }
964
        provider = auth.get_provider(module, user2, identifier,
965
                                     **provider_params)
966
        provider.add_to_user()
967
        provider = auth.get_provider(module, user, identifier,
968
                                     **provider_params)
969
        provider.add_to_user()
970
        user.email_verified = True
971
        user.save()
972
        self.assertRaises(Exception, provider.add_to_user)
973
        provider = user.get_auth_provider(module, identifier)
974
        self.assertEqual(user.get_auth_provider(
975
            module, identifier)._instance.info.get('age'), 27)
976

    
977
        module = 'local'
978
        identifier = None
979
        provider_params = {'auth_backend': 'ldap', 'info':
980
                          {'office': 'A1'}}
981
        provider = auth.get_provider(module, user, identifier,
982
                                     **provider_params)
983
        provider.add_to_user()
984
        self.assertFalse(provider.get_add_policy)
985
        self.assertRaises(Exception, provider.add_to_user)
986

    
987
        shib = user.get_auth_provider('shibboleth',
988
                                      'SHIB_UUID')
989
        self.assertTrue(shib.get_remove_policy)
990

    
991
        local = user.get_auth_provider('local')
992
        self.assertTrue(local.get_remove_policy)
993

    
994
        local.remove_from_user()
995
        self.assertFalse(shib.get_remove_policy)
996
        self.assertRaises(Exception, shib.remove_from_user)
997

    
998
        provider = user.get_auth_providers()[0]
999
        self.assertRaises(Exception, provider.add_to_user)
1000

    
1001
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1002
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1003
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1004
                                                 'group2'])
1005
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1006
                        CREATION_GROUPS_POLICY=['localgroup-create',
1007
                                                'group-create'])
1008
    def test_add_groups(self):
1009
        user = auth_functions.make_user("kpap@synnefo.org")
1010
        provider = auth.get_provider('shibboleth', user, 'test123')
1011
        provider.add_to_user()
1012
        user = AstakosUser.objects.get()
1013
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1014
                         sorted([u'group1', u'group2', u'group-create']))
1015

    
1016
        local = auth.get_provider('local', user)
1017
        local.add_to_user()
1018
        provider = user.get_auth_provider('shibboleth')
1019
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1020
        provider.remove_from_user()
1021
        user = AstakosUser.objects.get()
1022
        self.assertEqual(len(user.get_auth_providers()), 1)
1023
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1024
                         sorted([u'group-create', u'localgroup']))
1025

    
1026
        local = user.get_auth_provider('local')
1027
        self.assertRaises(Exception, local.remove_from_user)
1028
        provider = auth.get_provider('shibboleth', user, 'test123')
1029
        provider.add_to_user()
1030
        user = AstakosUser.objects.get()
1031
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1032
                         sorted([u'group-create', u'group1', u'group2',
1033
                                 u'localgroup']))
1034
        Group.objects.all().delete()
1035

    
1036
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1037
    def test_policies(self):
1038
        group_old, created = Group.objects.get_or_create(name='olduser')
1039

    
1040
        astakos_settings.MODERATION_ENABLED = True
1041
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1042
            ['academic-user']
1043
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1044
            ['google-user']
1045

    
1046
        user = auth_functions.make_user("kpap@synnefo.org")
1047
        user.groups.add(group_old)
1048
        user.add_auth_provider('local')
1049

    
1050
        user2 = auth_functions.make_user("kpap2@synnefo.org")
1051
        user2.add_auth_provider('shibboleth', identifier='shibid')
1052

    
1053
        user3 = auth_functions.make_user("kpap3@synnefo.org")
1054
        user3.groups.add(group_old)
1055
        user3.add_auth_provider('local')
1056
        user3.add_auth_provider('shibboleth', identifier='1234')
1057

    
1058
        self.assertTrue(user2.groups.get(name='academic-user'))
1059
        self.assertFalse(user2.groups.filter(name='olduser').count())
1060

    
1061
        local = auth_providers.get_provider('local')
1062
        self.assertTrue(local.get_add_policy)
1063

    
1064
        academic_group = Group.objects.get(name='academic-user')
1065
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1066
                                                     academic_group,
1067
                                                     exclusive=True,
1068
                                                     add=False,
1069
                                                     login=False)
1070
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1071
                                                     academic_group,
1072
                                                     exclusive=True,
1073
                                                     login=False,
1074
                                                     add=False)
1075
        # no duplicate entry gets created
1076
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1077

    
1078
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1079
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1080
                                                     user2,
1081
                                                     remove=False)
1082
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1083

    
1084
        local = auth_providers.get_provider('local', user2)
1085
        google = auth_providers.get_provider('google', user2)
1086
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1087
        self.assertTrue(shibboleth.get_login_policy)
1088
        self.assertFalse(shibboleth.get_remove_policy)
1089
        self.assertFalse(local.get_add_policy)
1090
        self.assertFalse(local.get_add_policy)
1091
        self.assertFalse(google.get_add_policy)
1092

    
1093
        user2.groups.remove(Group.objects.get(name='academic-user'))
1094
        self.assertTrue(local.get_add_policy)
1095
        self.assertTrue(google.get_add_policy)
1096
        user2.groups.add(Group.objects.get(name='academic-user'))
1097

    
1098
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1099
                                                     user2,
1100
                                                     exclusive=True,
1101
                                                     add=True)
1102
        self.assertTrue(local.get_add_policy)
1103
        self.assertTrue(google.get_add_policy)
1104

    
1105
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1106
        self.assertFalse(local.get_automoderate_policy)
1107
        self.assertFalse(google.get_automoderate_policy)
1108
        self.assertTrue(shibboleth.get_automoderate_policy)
1109

    
1110
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1111
                  'GOOGLE_ADD_GROUPS_POLICY']:
1112
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1113

    
1114
    @shibboleth_settings(CREATE_POLICY=True)
1115
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1116
    def test_create_http(self):
1117
        # this should be wrapped inside a transaction
1118
        user = auth_functions.make_user(email="test@test.com")
1119
        provider = auth_providers.get_provider('shibboleth', user,
1120
                                               'test@academia.test')
1121
        provider.add_to_user()
1122
        user.get_auth_provider('shibboleth', 'test@academia.test')
1123
        provider = auth_providers.get_provider('local', user)
1124
        provider.add_to_user()
1125
        user.get_auth_provider('local')
1126

    
1127
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1128
        user = auth_functions.make_user("test2@test.com")
1129
        provider = auth_providers.get_provider('shibboleth', user,
1130
                                               'test@shibboleth.com',
1131
                                               **{'info': {'name':
1132
                                                           'User Test'}})
1133
        self.assertFalse(provider.get_create_policy)
1134
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1135
        self.assertTrue(provider.get_create_policy)
1136
        academic = provider.add_to_user()
1137

    
1138
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1139
    @shibboleth_settings(LIMIT_POLICY=2)
1140
    def test_policies(self):
1141
        user = get_local_user('kpap@synnefo.org')
1142
        user.add_auth_provider('shibboleth', identifier='1234')
1143
        user.add_auth_provider('shibboleth', identifier='12345')
1144

    
1145
        # default limit is 1
1146
        local = user.get_auth_provider('local')
1147
        self.assertEqual(local.get_add_policy, False)
1148

    
1149
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1150
        academic = user.get_auth_provider('shibboleth',
1151
                                          identifier='1234')
1152
        self.assertEqual(academic.get_add_policy, False)
1153
        newacademic = auth_providers.get_provider('shibboleth', user,
1154
                                                  identifier='123456')
1155
        self.assertEqual(newacademic.get_add_policy, True)
1156
        user.add_auth_provider('shibboleth', identifier='123456')
1157
        self.assertEqual(academic.get_add_policy, False)
1158
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1159

    
1160
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1161
    @shibboleth_settings(LIMIT_POLICY=2)
1162
    def test_messages(self):
1163
        user = get_local_user('kpap@synnefo.org')
1164
        user.add_auth_provider('shibboleth', identifier='1234')
1165
        user.add_auth_provider('shibboleth', identifier='12345')
1166
        provider = auth_providers.get_provider('shibboleth')
1167
        self.assertEqual(provider.get_message('title'), 'Academic')
1168
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1169
        # regenerate messages cache
1170
        provider = auth_providers.get_provider('shibboleth')
1171
        self.assertEqual(provider.get_message('title'), 'New title')
1172
        self.assertEqual(provider.get_message('login_title'),
1173
                         'New title LOGIN')
1174
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1175
        self.assertEqual(provider.get_module_icon,
1176
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1177
        self.assertEqual(provider.get_module_medium_icon,
1178
                         settings.MEDIA_URL +
1179
                         'im/auth/icons-medium/shibboleth.png')
1180

    
1181
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1182
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1183
        self.assertEqual(provider.get_method_details_msg,
1184
                         'Account: 12345')
1185
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1186
        self.assertEqual(provider.get_method_details_msg,
1187
                         'Account: 1234')
1188

    
1189
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1190
        self.assertEqual(provider.get_not_active_msg,
1191
                         "'Academic login' is disabled.")
1192

    
1193
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1194
    @shibboleth_settings(LIMIT_POLICY=2)
1195
    def test_templates(self):
1196
        user = get_local_user('kpap@synnefo.org')
1197
        user.add_auth_provider('shibboleth', identifier='1234')
1198
        user.add_auth_provider('shibboleth', identifier='12345')
1199

    
1200
        provider = auth_providers.get_provider('shibboleth')
1201
        self.assertEqual(provider.get_template('login'),
1202
                         'im/auth/shibboleth_login.html')
1203
        provider = auth_providers.get_provider('google')
1204
        self.assertEqual(provider.get_template('login'),
1205
                         'im/auth/generic_login.html')
1206

    
1207

    
1208
class TestActivationBackend(TestCase):
1209

    
1210
    def setUp(self):
1211
        # dummy call to pass through logging middleware
1212
        self.client.get(ui_url(''))
1213

    
1214
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1215
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1216
    def test_policies(self):
1217
        backend = activation_backends.get_backend()
1218

    
1219
        # email matches RE_USER_EMAIL_PATTERNS
1220
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1221
                               is_active=False, email_verified=False)
1222
        backend.handle_verification(user1, user1.verification_code)
1223
        self.assertEqual(user1.accepted_policy, 'email')
1224

    
1225
        # manually moderated
1226
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1227
                               is_active=False, email_verified=False)
1228

    
1229
        backend.handle_verification(user2, user2.verification_code)
1230
        self.assertEqual(user2.moderated, False)
1231
        backend.handle_moderation(user2)
1232
        self.assertEqual(user2.moderated, True)
1233
        self.assertEqual(user2.accepted_policy, 'manual')
1234

    
1235
        # autoaccept due to provider automoderate policy
1236
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1237
                               is_active=False, email_verified=False)
1238
        user3.auth_providers.all().delete()
1239
        user3.add_auth_provider('shibboleth', identifier='shib123')
1240
        backend.handle_verification(user3, user3.verification_code)
1241
        self.assertEqual(user3.moderated, True)
1242
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1243

    
1244
    @im_settings(MODERATION_ENABLED=False,
1245
                 MANAGERS=(('Manager',
1246
                            'manager@synnefo.org'),),
1247
                 HELPDESK=(('Helpdesk',
1248
                            'helpdesk@synnefo.org'),),
1249
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1250
    def test_without_moderation(self):
1251
        backend = activation_backends.get_backend()
1252
        form = backend.get_signup_form('local')
1253
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1254

    
1255
        user_data = {
1256
            'email': 'kpap@synnefo.org',
1257
            'first_name': 'Kostas Papas',
1258
            'password1': '123',
1259
            'password2': '123'
1260
        }
1261
        form = backend.get_signup_form('local', user_data)
1262
        user = form.create_user()
1263
        self.assertEqual(user.is_active, False)
1264
        self.assertEqual(user.email_verified, False)
1265

    
1266
        # step one, registration
1267
        result = backend.handle_registration(user)
1268
        user = AstakosUser.objects.get()
1269
        self.assertEqual(user.is_active, False)
1270
        self.assertEqual(user.email_verified, False)
1271
        self.assertTrue(user.verification_code)
1272
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1273
        backend.send_result_notifications(result, user)
1274
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1275
        self.assertEqual(len(mail.outbox), 1)
1276

    
1277
        # step two, verify email (automatically
1278
        # moderates/accepts user, since moderation is disabled)
1279
        user = AstakosUser.objects.get()
1280
        valid_code = user.verification_code
1281

    
1282
        # test invalid code
1283
        result = backend.handle_verification(user, valid_code)
1284
        backend.send_result_notifications(result, user)
1285
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1286
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1287
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1288
        # verification + activated + greeting = 3
1289
        self.assertEqual(len(mail.outbox), 3)
1290
        user = AstakosUser.objects.get()
1291
        self.assertEqual(user.is_active, True)
1292
        self.assertEqual(user.moderated, True)
1293
        self.assertTrue(user.moderated_at)
1294
        self.assertEqual(user.email_verified, True)
1295
        self.assertTrue(user.activation_sent)
1296

    
1297
    @im_settings(MODERATION_ENABLED=True,
1298
                 MANAGERS=(('Manager',
1299
                            'manager@synnefo.org'),),
1300
                 HELPDESK=(('Helpdesk',
1301
                            'helpdesk@synnefo.org'),),
1302
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1303
    def test_with_moderation(self):
1304

    
1305
        backend = activation_backends.get_backend()
1306
        form = backend.get_signup_form('local')
1307
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1308

    
1309
        user_data = {
1310
            'email': 'kpap@synnefo.org',
1311
            'first_name': 'Kostas Papas',
1312
            'password1': '123',
1313
            'password2': '123'
1314
        }
1315
        form = backend.get_signup_form(provider='local',
1316
                                       initial_data=user_data)
1317
        user = form.create_user()
1318
        self.assertEqual(user.is_active, False)
1319
        self.assertEqual(user.email_verified, False)
1320

    
1321
        # step one, registration
1322
        result = backend.handle_registration(user)
1323
        user = AstakosUser.objects.get()
1324
        self.assertEqual(user.is_active, False)
1325
        self.assertEqual(user.email_verified, False)
1326
        self.assertTrue(user.verification_code)
1327
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1328
        backend.send_result_notifications(result, user)
1329
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1330
        self.assertEqual(len(mail.outbox), 1)
1331

    
1332
        # step two, verifying email
1333
        user = AstakosUser.objects.get()
1334
        valid_code = user.verification_code
1335
        invalid_code = user.verification_code + 'invalid'
1336

    
1337
        # test invalid code
1338
        result = backend.handle_verification(user, invalid_code)
1339
        self.assertEqual(result.status, backend.Result.ERROR)
1340
        backend.send_result_notifications(result, user)
1341
        user = AstakosUser.objects.get()
1342
        self.assertEqual(user.is_active, False)
1343
        self.assertEqual(user.moderated, False)
1344
        self.assertEqual(user.moderated_at, None)
1345
        self.assertEqual(user.email_verified, False)
1346
        self.assertTrue(user.activation_sent)
1347

    
1348
        # test valid code
1349
        user = AstakosUser.objects.get()
1350
        result = backend.handle_verification(user, valid_code)
1351
        backend.send_result_notifications(result, user)
1352
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1353
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1354
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1355
        self.assertEqual(len(mail.outbox), 2)
1356
        user = AstakosUser.objects.get()
1357
        self.assertEqual(user.moderated, False)
1358
        self.assertEqual(user.moderated_at, None)
1359
        self.assertEqual(user.email_verified, True)
1360
        self.assertTrue(user.activation_sent)
1361

    
1362
        # test code reuse
1363
        result = backend.handle_verification(user, valid_code)
1364
        self.assertEqual(result.status, backend.Result.ERROR)
1365
        user = AstakosUser.objects.get()
1366
        self.assertEqual(user.is_active, False)
1367
        self.assertEqual(user.moderated, False)
1368
        self.assertEqual(user.moderated_at, None)
1369
        self.assertEqual(user.email_verified, True)
1370
        self.assertTrue(user.activation_sent)
1371

    
1372
        # valid code on verified user
1373
        user = AstakosUser.objects.get()
1374
        valid_code = user.verification_code
1375
        result = backend.handle_verification(user, valid_code)
1376
        self.assertEqual(result.status, backend.Result.ERROR)
1377

    
1378
        # step three, moderation user
1379
        user = AstakosUser.objects.get()
1380
        result = backend.handle_moderation(user)
1381
        backend.send_result_notifications(result, user)
1382

    
1383
        user = AstakosUser.objects.get()
1384
        self.assertEqual(user.is_active, True)
1385
        self.assertEqual(user.moderated, True)
1386
        self.assertTrue(user.moderated_at)
1387
        self.assertEqual(user.email_verified, True)
1388
        self.assertTrue(user.activation_sent)
1389

    
1390

    
1391
class TestWebloginRedirect(TestCase):
1392

    
1393
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1394
    def test_restricts_domains(self):
1395
        get_local_user('user1@synnefo.org')
1396

    
1397
        # next url construct helpers
1398
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1399
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1400
            urllib.quote_plus(nxt)
1401

    
1402
        # common cases
1403
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1404
        invalid_scheme = weblogin("customscheme://localhost")
1405
        invalid_scheme_with_valid_domain = \
1406
            weblogin("http://www.invaliddomain.com")
1407
        valid_scheme = weblogin("pithos://localhost/")
1408
        # to be used in assertRedirects
1409
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1410

    
1411
        # not authenticated, redirects to login which contains next param with
1412
        # additional nested quoted next params
1413
        r = self.client.get(valid_scheme, follow=True)
1414
        login_redirect = reverse('login') + '?next=' + \
1415
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1416
        self.assertRedirects(r, login_redirect)
1417

    
1418
        # authenticate client
1419
        self.client.login(username="user1@synnefo.org", password="password")
1420

    
1421
        # valid scheme
1422
        r = self.client.get(valid_scheme, follow=True)
1423
        url = r.redirect_chain[1][0]
1424
        # scheme preserved
1425
        self.assertTrue(url.startswith('pithos://localhost/'))
1426
        # redirect contains token param
1427
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1428
                                   scheme='https').query
1429
        params = urlparse.parse_qs(params)
1430
        self.assertEqual(params['token'][0],
1431
                         AstakosUser.objects.get().auth_token)
1432
        # does not contain uuid
1433
        # reverted for 0.14.2 to support old pithos desktop clients
1434
        #self.assertFalse('uuid' in params)
1435

    
1436
        # invalid cases
1437
        r = self.client.get(invalid_scheme, follow=True)
1438
        self.assertEqual(r.status_code, 403)
1439

    
1440
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1441
        self.assertEqual(r.status_code, 403)
1442

    
1443
        r = self.client.get(invalid_domain, follow=True)
1444
        self.assertEqual(r.status_code, 403)