Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ fb4ef6eb

History | View | Annotate | Download (60.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urlparse
35
import urllib
36

    
37
from astakos.im.tests.common import *
38

    
39
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
40

    
41

    
42
class ShibbolethTests(TestCase):
43
    """
44
    Testing shibboleth authentication.
45
    """
46

    
47
    def setUp(self):
48
        self.client = ShibbolethClient()
49
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
50
        astakos_settings.MODERATION_ENABLED = True
51

    
52
    def tearDown(self):
53
        AstakosUser.objects.all().delete()
54

    
55
    @im_settings(FORCE_PROFILE_UPDATE=False)
56
    def test_create_account(self):
57

    
58
        client = ShibbolethClient()
59

    
60
        # shibboleth views validation
61
        # eepn required
62
        r = client.get(ui_url('login/shibboleth?'), follow=True)
63
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
64
            'domain': astakos_settings.BASE_HOST,
65
            'contact_email': settings.CONTACT_EMAIL
66
        })
67
        client.set_tokens(eppn="kpapeppn")
68

    
69
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
70
        # shibboleth user info required
71
        r = client.get(ui_url('login/shibboleth?'), follow=True)
72
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
73
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
74

    
75
        # shibboleth logged us in
76
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
77
                          cn="Kostas Papadimitriou",
78
                          ep_affiliation="Test Affiliation")
79
        r = client.get(ui_url('login/shibboleth?'), follow=True,
80
                       **{'HTTP_SHIB_CUSTOM_IDP_KEY': 'test'})
81
        token = PendingThirdPartyUser.objects.get().token
82
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
83
        self.assertEqual(r.status_code, 200)
84

    
85
        # a new pending user created
86
        pending_user = PendingThirdPartyUser.objects.get(
87
            third_party_identifier="kpapeppn")
88
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
89
        # keep the token for future use
90
        token = pending_user.token
91
        # from now on no shibboleth headers are sent to the server
92
        client.reset_tokens()
93

    
94
        # this is the old way, it should fail, to avoid pending user take over
95
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
96
        self.assertEqual(r.status_code, 404)
97

    
98
        # this is the signup unique url associated with the pending user
99
        # created
100
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
101
        identifier = pending_user.third_party_identifier
102
        post_data = {'third_party_identifier': identifier,
103
                     'first_name': 'Kostas',
104
                     'third_party_token': token,
105
                     'last_name': 'Mitroglou',
106
                     'provider': 'shibboleth'}
107

    
108
        signup_url = reverse('signup')
109

    
110
        # invlid email
111
        post_data['email'] = 'kpap'
112
        r = client.post(signup_url, post_data)
113
        self.assertContains(r, token)
114

    
115
        # existing email
116
        existing_user = get_local_user('test@test.com')
117
        post_data['email'] = 'test@test.com'
118
        r = client.post(signup_url, post_data)
119
        self.assertContains(r, messages.EMAIL_USED)
120
        existing_user.delete()
121

    
122
        # and finally a valid signup
123
        post_data['email'] = 'kpap@synnefo.org'
124
        r = client.post(signup_url, post_data, follow=True)
125
        self.assertContains(r, messages.VERIFICATION_SENT)
126

    
127
        # entires commited as expected
128
        self.assertEqual(AstakosUser.objects.count(), 1)
129
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
130
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
131

    
132
        user = AstakosUser.objects.get()
133
        provider = user.get_auth_provider("shibboleth")
134
        headers = provider.provider_details['info']['headers']
135
        self.assertEqual(headers.get('SHIB_CUSTOM_IDP_KEY'), 'test')
136

    
137
        # provider info stored
138
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
139
        self.assertEqual(provider.affiliation, 'Test Affiliation')
140
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
141
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
142
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
143
        self.assertTrue('headers' in provider.info)
144

    
145
        # login (not activated yet)
146
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
147
                          cn="Kostas Papadimitriou")
148
        r = client.get(ui_url("login/shibboleth?"), follow=True)
149
        self.assertContains(r, 'is pending moderation')
150

    
151
        # admin activates the user
152
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
153
        backend = activation_backends.get_backend()
154
        activation_result = backend.verify_user(u, u.verification_code)
155
        activation_result = backend.accept_user(u)
156
        self.assertFalse(activation_result.is_error())
157
        backend.send_result_notifications(activation_result, u)
158
        self.assertEqual(u.is_active, True)
159

    
160
        # we see our profile
161
        r = client.get(ui_url("login/shibboleth?"), follow=True)
162
        self.assertRedirects(r, ui_url('landing'))
163
        self.assertEqual(r.status_code, 200)
164

    
165
    def test_existing(self):
166
        """
167
        Test adding of third party login to an existing account
168
        """
169

    
170
        # this is our existing user
171
        existing_user = get_local_user('kpap@synnefo.org')
172
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
173
        existing_inactive.is_active = False
174
        existing_inactive.save()
175

    
176
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
177
        existing_unverified.is_active = False
178
        existing_unverified.activation_sent = None
179
        existing_unverified.email_verified = False
180
        existing_unverified.is_verified = False
181
        existing_unverified.save()
182

    
183
        client = ShibbolethClient()
184
        # shibboleth logged us in, notice that we use different email
185
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
186
                          cn="Kostas Papadimitriou", )
187
        r = client.get(ui_url("login/shibboleth?"), follow=True)
188

    
189
        # a new pending user created
190
        pending_user = PendingThirdPartyUser.objects.get()
191
        token = pending_user.token
192
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
193
        pending_key = pending_user.token
194
        client.reset_tokens()
195
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
196

    
197
        form = r.context['login_form']
198
        signupdata = copy.copy(form.initial)
199
        signupdata['email'] = 'kpap@synnefo.org'
200
        signupdata['third_party_token'] = token
201
        signupdata['provider'] = 'shibboleth'
202
        signupdata.pop('id', None)
203

    
204
        # the email exists to another user
205
        r = client.post(ui_url("signup"), signupdata)
206
        self.assertContains(r, "There is already an account with this email "
207
                               "address")
208
        # change the case, still cannot create
209
        signupdata['email'] = 'KPAP@synnefo.org'
210
        r = client.post(ui_url("signup"), signupdata)
211
        self.assertContains(r, "There is already an account with this email "
212
                               "address")
213
        # inactive user
214
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
215
        r = client.post(ui_url("signup"), signupdata)
216
        self.assertContains(r, "There is already an account with this email "
217
                               "address")
218

    
219
        # unverified user, this should pass, old entry will be deleted
220
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
221
        r = client.post(ui_url("signup"), signupdata)
222

    
223
        post_data = {'password': 'password',
224
                     'username': 'kpap@synnefo.org'}
225
        r = client.post(ui_url('local'), post_data, follow=True)
226
        self.assertTrue(r.context['request'].user.is_authenticated())
227
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
228
                          cn="Kostas Papadimitriou", )
229
        r = client.get(ui_url("login/shibboleth?"), follow=True)
230
        self.assertContains(r, "enabled for this account")
231
        client.reset_tokens()
232

    
233
        user = existing_user
234
        self.assertTrue(user.has_auth_provider('shibboleth'))
235
        self.assertTrue(user.has_auth_provider('local',
236
                                               auth_backend='astakos'))
237
        client.logout()
238

    
239
        # look Ma, i can login with both my shibboleth and local account
240
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
241
                          cn="Kostas Papadimitriou")
242
        r = client.get(ui_url("login/shibboleth?"), follow=True)
243
        self.assertTrue(r.context['request'].user.is_authenticated())
244
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
245
        self.assertRedirects(r, ui_url('landing'))
246
        self.assertEqual(r.status_code, 200)
247

    
248
        user = r.context['request'].user
249
        client.logout()
250
        client.reset_tokens()
251

    
252
        # logged out
253
        r = client.get(ui_url("profile"), follow=True)
254
        self.assertFalse(r.context['request'].user.is_authenticated())
255

    
256
        # login with local account also works
257
        post_data = {'password': 'password',
258
                     'username': 'kpap@synnefo.org'}
259
        r = self.client.post(ui_url('local'), post_data, follow=True)
260
        self.assertTrue(r.context['request'].user.is_authenticated())
261
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
262
        self.assertRedirects(r, ui_url('landing'))
263
        self.assertEqual(r.status_code, 200)
264

    
265
        # cannot add the same eppn
266
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
267
                          cn="Kostas Papadimitriou", )
268
        r = client.get(ui_url("login/shibboleth?"), follow=True)
269
        self.assertRedirects(r, ui_url('landing'))
270
        self.assertTrue(r.status_code, 200)
271
        self.assertEquals(existing_user.auth_providers.count(), 2)
272

    
273
        # only one allowed by default
274
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
275
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
276
        prov = auth_providers.get_provider('shibboleth')
277
        r = client.get(ui_url("login/shibboleth?"), follow=True)
278
        self.assertContains(r, "Failed to add")
279
        self.assertRedirects(r, ui_url('profile'))
280
        self.assertTrue(r.status_code, 200)
281
        self.assertEquals(existing_user.auth_providers.count(), 2)
282
        client.logout()
283
        client.reset_tokens()
284

    
285
        # cannot login with another eppn
286
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
287
                          cn="Kostas Papadimitriou")
288
        r = client.get(ui_url("login/shibboleth?"), follow=True)
289
        self.assertFalse(r.context['request'].user.is_authenticated())
290

    
291
        # cannot
292

    
293
        # lets remove local password
294
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
295
                                       email="kpap@synnefo.org")
296
        remove_local_url = user.get_auth_provider('local').get_remove_url
297
        remove_shibbo_url = user.get_auth_provider('shibboleth',
298
                                                   'kpapeppn').get_remove_url
299
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
300
                          cn="Kostas Papadimtriou")
301
        r = client.get(ui_url("login/shibboleth?"), follow=True)
302
        client.reset_tokens()
303

    
304
        # only POST is allowed (for CSRF protection)
305
        r = client.get(remove_local_url, follow=True)
306
        self.assertEqual(r.status_code, 405)
307

    
308
        r = client.post(remove_local_url, follow=True)
309
        # 2 providers left
310
        self.assertEqual(user.auth_providers.count(), 1)
311
        # cannot remove last provider
312
        r = client.post(remove_shibbo_url)
313
        self.assertEqual(r.status_code, 403)
314
        self.client.logout()
315

    
316
        # cannot login using local credentials (notice we use another client)
317
        post_data = {'password': 'password',
318
                     'username': 'kpap@synnefo.org'}
319
        r = self.client.post(ui_url('local'), post_data, follow=True)
320
        self.assertFalse(r.context['request'].user.is_authenticated())
321

    
322
        # we can reenable the local provider by setting a password
323
        r = client.get(ui_url("password_change"), follow=True)
324
        r = client.post(ui_url("password_change"), {'new_password1': '111',
325
                                                'new_password2': '111'},
326
                        follow=True)
327
        user = r.context['request'].user
328
        self.assertTrue(user.has_auth_provider('local'))
329
        self.assertTrue(user.has_auth_provider('shibboleth'))
330
        self.assertTrue(user.check_password('111'))
331
        self.assertTrue(user.has_usable_password())
332

    
333
        # change password via profile form
334
        r = client.post(ui_url("profile"), {
335
            'old_password': '111',
336
            'new_password': '',
337
            'new_password2': '',
338
            'change_password': 'on',
339
        }, follow=False)
340
        self.assertEqual(r.status_code, 200)
341
        self.assertFalse(r.context['profile_form'].is_valid())
342

    
343
        self.client.logout()
344

    
345
        # now we can login
346
        post_data = {'password': '111',
347
                     'username': 'kpap@synnefo.org'}
348
        r = self.client.post(ui_url('local'), post_data, follow=True)
349
        self.assertTrue(r.context['request'].user.is_authenticated())
350

    
351
        client.reset_tokens()
352

    
353
        # we cannot take over another shibboleth identifier
354
        user2 = get_local_user('another@synnefo.org')
355
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
356
        # login
357
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
358
                          cn="Kostas Papadimitriou")
359
        r = client.get(ui_url("login/shibboleth?"), follow=True)
360
        # try to assign existing shibboleth identifier of another user
361
        client.set_tokens(mail="kpap_second@shibboleth.gr",
362
                          eppn="existingeppn", cn="Kostas Papadimitriou")
363
        r = client.get(ui_url("login/shibboleth?"), follow=True)
364
        self.assertContains(r, "is already in use")
365

    
366

    
367
class TestLocal(TestCase):
368

    
369
    def setUp(self):
370
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
371
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
372
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
373
        settings.ASTAKOS_MODERATION_ENABLED = True
374

    
375
    def tearDown(self):
376
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
377
        AstakosUser.objects.all().delete()
378

    
379
    def test_no_moderation(self):
380
        # disable moderation
381
        astakos_settings.MODERATION_ENABLED = False
382

    
383
        # create a new user
384
        r = self.client.get(ui_url("signup"))
385
        self.assertEqual(r.status_code, 200)
386
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
387
                'password2': 'password', 'first_name': 'Kostas',
388
                'last_name': 'Mitroglou', 'provider': 'local'}
389
        r = self.client.post(ui_url("signup"), data)
390

    
391
        # user created
392
        self.assertEqual(AstakosUser.objects.count(), 1)
393
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
394
                                       email="kpap@synnefo.org")
395
        self.assertEqual(user.username, 'kpap@synnefo.org')
396
        self.assertEqual(user.has_auth_provider('local'), True)
397
        self.assertFalse(user.is_active)
398

    
399
        # user (but not admin) gets notified
400
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
401
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
402
        astakos_settings.MODERATION_ENABLED = True
403

    
404
    def test_email_case(self):
405
        data = {
406
            'email': 'kPap@synnefo.org',
407
            'password1': '1234',
408
            'password2': '1234'
409
        }
410

    
411
        form = forms.LocalUserCreationForm(data)
412
        self.assertTrue(form.is_valid())
413
        user = form.save()
414
        form.store_user(user, {})
415

    
416
        u = AstakosUser.objects.get()
417
        self.assertEqual(u.email, 'kPap@synnefo.org')
418
        self.assertEqual(u.username, 'kpap@synnefo.org')
419
        u.is_active = True
420
        u.email_verified = True
421
        u.save()
422

    
423
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
424
        login = forms.LoginForm(data=data)
425
        self.assertTrue(login.is_valid())
426

    
427
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
428
        login = forms.LoginForm(data=data)
429
        self.assertTrue(login.is_valid())
430

    
431
        data = {
432
            'email': 'kpap@synnefo.org',
433
            'password1': '1234',
434
            'password2': '1234'
435
        }
436
        form = forms.LocalUserCreationForm(data)
437
        self.assertFalse(form.is_valid())
438

    
439
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
440
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
441
    def test_local_provider(self):
442
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
443

    
444
        # create a user
445
        r = self.client.get(ui_url("signup"))
446
        self.assertEqual(r.status_code, 200)
447
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
448
                'password2': 'password', 'first_name': 'Kostas',
449
                'last_name': 'Mitroglou', 'provider': 'local'}
450
        r = self.client.post(ui_url("signup"), data)
451

    
452
        # user created
453
        self.assertEqual(AstakosUser.objects.count(), 1)
454
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
455
                                       email="kpap@synnefo.org")
456
        self.assertEqual(user.username, 'kpap@synnefo.org')
457
        self.assertEqual(user.has_auth_provider('local'), True)
458
        self.assertFalse(user.is_active)  # not activated
459
        self.assertFalse(user.email_verified)  # not verified
460
        self.assertTrue(user.activation_sent)  # activation automatically sent
461
        self.assertFalse(user.moderated)
462
        self.assertFalse(user.email_verified)
463

    
464
        # admin gets notified and activates the user from the command line
465
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
466
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
467
                                           'password': 'password'},
468
                             follow=True)
469
        self.assertContains(r, messages.VERIFICATION_SENT)
470
        backend = activation_backends.get_backend()
471

    
472
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
473
        backend.send_user_verification_email(user)
474

    
475
        # user activation fields updated and user gets notified via email
476
        user = AstakosUser.objects.get(pk=user.pk)
477
        self.assertTrue(user.activation_sent)
478
        self.assertFalse(user.email_verified)
479
        self.assertFalse(user.is_active)
480
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
481

    
482
        # user forgot she got registered and tries to submit registration
483
        # form. Notice the upper case in email
484
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
485
                'password2': 'password', 'first_name': 'Kostas',
486
                'last_name': 'Mitroglou', 'provider': 'local'}
487
        r = self.client.post(ui_url("signup"), data, follow=True)
488
        self.assertRedirects(r, reverse('login'))
489
        self.assertContains(r, messages.VERIFICATION_SENT)
490

    
491
        user = AstakosUser.objects.get()
492
        # previous user replaced
493
        self.assertTrue(user.activation_sent)
494
        self.assertFalse(user.email_verified)
495
        self.assertFalse(user.is_active)
496
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
497

    
498
        # hmmm, email exists; lets request a password change
499
        r = self.client.get(ui_url('local/password_reset'))
500
        self.assertEqual(r.status_code, 200)
501
        data = {'email': 'kpap@synnefo.org'}
502
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
503
        # she can't because account is not active yet
504
        self.assertContains(r, 'pending activation')
505

    
506
        # moderation is enabled and an activation email has already been sent
507
        # so user can trigger resend of the activation email
508
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
509
                            follow=True)
510
        self.assertContains(r, 'has been sent to your email address.')
511
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
512

    
513
        # also she cannot login
514
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
515
        r = self.client.post(ui_url('local'), data, follow=True)
516
        self.assertContains(r, 'Resend activation')
517
        self.assertFalse(r.context['request'].user.is_authenticated())
518
        self.assertFalse('_pithos2_a' in self.client.cookies)
519

    
520
        # user sees the message and resends activation
521
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
522
                            follow=True)
523
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
524

    
525
        # logged in user cannot activate another account
526
        tmp_user = get_local_user("test_existing_user@synnefo.org")
527
        tmp_client = Client()
528
        tmp_client.login(username="test_existing_user@synnefo.org",
529
                         password="password")
530
        r = tmp_client.get(user.get_activation_url(), follow=True)
531
        self.assertContains(r, messages.LOGGED_IN_WARNING)
532

    
533
        # empty activation code is not allowed
534
        r = self.client.get(user.get_activation_url().split("?")[0],
535
                            follow=True)
536
        self.assertEqual(r.status_code, 403)
537

    
538
        r = self.client.get(user.get_activation_url(), follow=True)
539
        # previous code got invalidated
540
        self.assertEqual(r.status_code, 404)
541

    
542
        user = AstakosUser.objects.get(pk=user.pk)
543
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
544
        r = self.client.get(user.get_activation_url(), follow=True)
545
        self.assertRedirects(r, reverse('login'))
546
        # user sees that account is pending approval from admins
547
        self.assertContains(r, messages.NOTIFICATION_SENT)
548
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
549

    
550
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
551
        result = backend.handle_moderation(user)
552
        backend.send_result_notifications(result, user)
553
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
554
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
555

    
556
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
557
        r = self.client.get(ui_url('profile'), follow=True)
558
        self.assertFalse(r.context['request'].user.is_authenticated())
559
        self.assertFalse('_pithos2_a' in self.client.cookies)
560
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
561

    
562
        user = AstakosUser.objects.get(pk=user.pk)
563
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
564
                                               'password': 'password'},
565
                             follow=True)
566
        # user activated and logged in, token cookie set
567
        self.assertTrue(r.context['request'].user.is_authenticated())
568
        self.assertTrue('_pithos2_a' in self.client.cookies)
569
        cookies = self.client.cookies
570
        self.assertTrue(quote(user.auth_token) in
571
                        cookies.get('_pithos2_a').value)
572
        r = self.client.get(ui_url('logout'), follow=True)
573
        r = self.client.get(ui_url(''), follow=True)
574
        self.assertRedirects(r, ui_url('login'))
575
        # user logged out, token cookie removed
576
        self.assertFalse(r.context['request'].user.is_authenticated())
577
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
578

    
579
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
580
        del self.client.cookies['_pithos2_a']
581

    
582
        # user can login
583
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
584
                                               'password': 'password'},
585
                             follow=True)
586
        self.assertTrue(r.context['request'].user.is_authenticated())
587
        self.assertTrue('_pithos2_a' in self.client.cookies)
588
        cookies = self.client.cookies
589
        self.assertTrue(quote(user.auth_token) in
590
                        cookies.get('_pithos2_a').value)
591
        self.client.get(ui_url('logout'), follow=True)
592

    
593
        # user forgot password
594
        old_pass = user.password
595
        r = self.client.get(ui_url('local/password_reset'))
596
        self.assertEqual(r.status_code, 200)
597
        r = self.client.post(ui_url('local/password_reset'),
598
                             {'email': 'kpap@synnefo.org'})
599
        self.assertEqual(r.status_code, 302)
600
        # email sent
601
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
602

    
603
        # user visits change password link
604
        user = AstakosUser.objects.get(pk=user.pk)
605
        r = self.client.get(user.get_password_reset_url())
606
        r = self.client.post(user.get_password_reset_url(),
607
                             {'new_password1': 'newpass',
608
                              'new_password2': 'newpass'})
609

    
610
        user = AstakosUser.objects.get(pk=user.pk)
611
        self.assertNotEqual(old_pass, user.password)
612

    
613
        # old pass is not usable
614
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
615
                                               'password': 'password'})
616
        self.assertContains(r, 'Please enter a correct username and password')
617
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
618
                                               'password': 'newpass'},
619
                             follow=True)
620
        self.assertTrue(r.context['request'].user.is_authenticated())
621
        self.client.logout()
622

    
623
        # tests of special local backends
624
        user = AstakosUser.objects.get(pk=user.pk)
625
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
626
        user.save()
627

    
628
        # non astakos local backends do not support password reset
629
        r = self.client.get(ui_url('local/password_reset'))
630
        self.assertEqual(r.status_code, 200)
631
        r = self.client.post(ui_url('local/password_reset'),
632
                             {'email': 'kpap@synnefo.org'})
633
        # she can't because account is not active yet
634
        self.assertContains(r, "Changing password is not")
635

    
636

    
637
class UserActionsTests(TestCase):
638

    
639
    def test_email_change(self):
640
        # to test existing email validation
641
        get_local_user('existing@synnefo.org')
642

    
643
        # local user
644
        user = get_local_user('kpap@synnefo.org')
645

    
646
        # login as kpap
647
        self.client.login(username='kpap@synnefo.org', password='password')
648
        r = self.client.get(ui_url('profile'), follow=True)
649
        user = r.context['request'].user
650
        self.assertTrue(user.is_authenticated())
651

    
652
        # change email is enabled
653
        r = self.client.get(ui_url('email_change'))
654
        self.assertEqual(r.status_code, 200)
655
        self.assertFalse(user.email_change_is_pending())
656

    
657
        # request email change to an existing email fails
658
        data = {'new_email_address': 'existing@synnefo.org'}
659
        r = self.client.post(ui_url('email_change'), data)
660
        self.assertContains(r, messages.EMAIL_USED)
661

    
662
        # proper email change
663
        data = {'new_email_address': 'kpap@gmail.com'}
664
        r = self.client.post(ui_url('email_change'), data, follow=True)
665
        self.assertRedirects(r, ui_url('profile'))
666
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
667
        change1 = EmailChange.objects.get()
668

    
669
        # user sees a warning
670
        r = self.client.get(ui_url('email_change'))
671
        self.assertEqual(r.status_code, 200)
672
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
673
        self.assertTrue(user.email_change_is_pending())
674

    
675
        # link was sent
676
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
677
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
678

    
679
        # proper email change
680
        data = {'new_email_address': 'kpap@yahoo.com'}
681
        r = self.client.post(ui_url('email_change'), data, follow=True)
682
        self.assertRedirects(r, ui_url('profile'))
683
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
684
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
685
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
686
        change2 = EmailChange.objects.get()
687

    
688
        r = self.client.get(change1.get_url())
689
        self.assertEquals(r.status_code, 404)
690
        self.client.logout()
691

    
692
        invalid_client = Client()
693
        r = invalid_client.post(ui_url('local?'),
694
                                {'username': 'existing@synnefo.org',
695
                                 'password': 'password'})
696
        r = invalid_client.get(change2.get_url(), follow=True)
697
        self.assertEquals(r.status_code, 403)
698

    
699
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
700
                             {'username': 'kpap@synnefo.org',
701
                              'password': 'password',
702
                              'next': change2.get_url()},
703
                             follow=True)
704
        self.assertRedirects(r, ui_url('profile'))
705
        user = r.context['request'].user
706
        self.assertEquals(user.email, 'kpap@yahoo.com')
707
        self.assertEquals(user.username, 'kpap@yahoo.com')
708

    
709
        self.client.logout()
710
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
711
                             {'username': 'kpap@synnefo.org',
712
                              'password': 'password',
713
                              'next': change2.get_url()},
714
                             follow=True)
715
        self.assertContains(r, "Please enter a correct username and password")
716
        self.assertEqual(user.emailchanges.count(), 0)
717

    
718
        AstakosUser.objects.all().delete()
719
        Group.objects.all().delete()
720

    
721

    
722
class TestAuthProviderViews(TestCase):
723

    
724
    def tearDown(self):
725
        AstakosUser.objects.all().delete()
726

    
727
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
728
                         AUTOMODERATE_POLICY=True)
729
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
730
                 FORCE_PROFILE_UPDATE=False)
731
    def test_user(self):
732
        Profile = AuthProviderPolicyProfile
733
        Pending = PendingThirdPartyUser
734
        User = AstakosUser
735

    
736
        User.objects.create(email="newuser@synnefo.org")
737
        get_local_user("olduser@synnefo.org")
738
        cl_olduser = ShibbolethClient()
739
        get_local_user("olduser2@synnefo.org")
740
        ShibbolethClient()
741
        cl_newuser = ShibbolethClient()
742
        cl_newuser2 = Client()
743

    
744
        academic_group, created = Group.objects.get_or_create(
745
            name='academic-login')
746
        academic_users = academic_group.user_set
747
        assert created
748
        policy_only_academic = Profile.objects.add_policy('academic_strict',
749
                                                          'shibboleth',
750
                                                          academic_group,
751
                                                          exclusive=True,
752
                                                          login=False,
753
                                                          add=False)
754

    
755
        # new academic user
756
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
757
        cl_newuser.set_tokens(eppn="newusereppn")
758
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
759
        pending = Pending.objects.get()
760
        identifier = pending.third_party_identifier
761
        signup_data = {'third_party_identifier': identifier,
762
                       'first_name': 'Academic',
763
                       'third_party_token': pending.token,
764
                       'last_name': 'New User',
765
                       'provider': 'shibboleth'}
766
        r = cl_newuser.post(ui_url('signup'), signup_data)
767
        self.assertContains(r, "This field is required", )
768
        signup_data['email'] = 'olduser@synnefo.org'
769
        r = cl_newuser.post(ui_url('signup'), signup_data)
770
        self.assertContains(r, "already an account with this email", )
771
        signup_data['email'] = 'newuser@synnefo.org'
772
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
773
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
774
        self.assertEqual(r.status_code, 404)
775
        newuser = User.objects.get(email="newuser@synnefo.org")
776
        activation_link = newuser.get_activation_url()
777
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
778

    
779
        # new non-academic user
780
        signup_data = {'first_name': 'Non Academic',
781
                       'last_name': 'New User',
782
                       'provider': 'local',
783
                       'password1': 'password',
784
                       'password2': 'password'}
785
        signup_data['email'] = 'olduser@synnefo.org'
786
        r = cl_newuser2.post(ui_url('signup'), signup_data)
787
        self.assertContains(r, 'There is already an account with this '
788
                               'email address')
789
        signup_data['email'] = 'newuser@synnefo.org'
790
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
791
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
792
        r = self.client.get(activation_link, follow=True)
793
        self.assertEqual(r.status_code, 404)
794
        newuser = User.objects.get(email="newuser@synnefo.org")
795
        self.assertTrue(newuser.activation_sent)
796

    
797
        # activation sent, user didn't open verification url so additional
798
        # registrations invalidate the previous signups.
799
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
800
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
801
        pending = Pending.objects.get()
802
        identifier = pending.third_party_identifier
803
        signup_data = {'third_party_identifier': identifier,
804
                       'first_name': 'Academic',
805
                       'third_party_token': pending.token,
806
                       'last_name': 'New User',
807
                       'provider': 'shibboleth'}
808
        signup_data['email'] = 'newuser@synnefo.org'
809
        r = cl_newuser.post(ui_url('signup'), signup_data)
810
        self.assertEqual(r.status_code, 302)
811
        newuser = User.objects.get(email="newuser@synnefo.org")
812
        self.assertTrue(newuser.activation_sent)
813
        activation_link = newuser.get_activation_url()
814
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
815
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
816
        self.assertRedirects(r, ui_url('landing'))
817
        newuser = User.objects.get(email="newuser@synnefo.org")
818
        self.assertEqual(newuser.is_active, True)
819
        self.assertEqual(newuser.email_verified, True)
820
        cl_newuser.logout()
821

    
822
        # cannot reactivate if suspended
823
        newuser.is_active = False
824
        newuser.save()
825
        r = cl_newuser.get(newuser.get_activation_url())
826
        newuser = User.objects.get(email="newuser@synnefo.org")
827
        self.assertFalse(newuser.is_active)
828

    
829
        # release suspension
830
        newuser.is_active = True
831
        newuser.save()
832

    
833
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
834
        local = auth.get_provider('local', newuser)
835
        self.assertEqual(local.get_add_policy, False)
836
        self.assertEqual(local.get_login_policy, False)
837
        r = cl_newuser.get(local.get_add_url, follow=True)
838
        self.assertRedirects(r, ui_url('profile'))
839
        self.assertContains(r, 'disabled for your')
840

    
841
        cl_olduser.login(username='olduser@synnefo.org', password="password")
842
        r = cl_olduser.get(ui_url('profile'), follow=True)
843
        self.assertEqual(r.status_code, 200)
844
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
845
        self.assertContains(r, 'Your request is missing a unique token')
846
        cl_olduser.set_tokens(eppn="newusereppn")
847
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
848
        self.assertContains(r, 'already in use')
849
        cl_olduser.set_tokens(eppn="oldusereppn")
850
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
851
        self.assertContains(r, 'Academic login enabled for this account')
852

    
853
        user = User.objects.get(email="olduser@synnefo.org")
854
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
855
        local_provider = user.get_auth_provider('local')
856
        self.assertEqual(shib_provider.get_remove_policy, True)
857
        self.assertEqual(local_provider.get_remove_policy, True)
858

    
859
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
860
                                                          'shibboleth',
861
                                                          academic_group,
862
                                                          remove=False)
863
        user.groups.add(academic_group)
864
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
865
        local_provider = user.get_auth_provider('local')
866
        self.assertEqual(shib_provider.get_remove_policy, False)
867
        self.assertEqual(local_provider.get_remove_policy, True)
868
        self.assertEqual(local_provider.get_login_policy, False)
869

    
870
        cl_olduser.logout()
871
        login_data = {'username': 'olduser@synnefo.org',
872
                      'password': 'password'}
873
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
874
        self.assertContains(r, "login/shibboleth'>Academic login")
875
        Group.objects.all().delete()
876

    
877

    
878
class TestAuthProvidersAPI(TestCase):
879
    """
880
    Test auth_providers module API
881
    """
882

    
883
    def tearDown(self):
884
        Group.objects.all().delete()
885

    
886
    @im_settings(IM_MODULES=['local', 'shibboleth'])
887
    def test_create(self):
888
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
889
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
890

    
891
        module = 'shibboleth'
892
        identifier = 'SHIB_UUID'
893
        provider_params = {
894
            'affiliation': 'UNIVERSITY',
895
            'info': {'age': 27}
896
        }
897
        provider = auth.get_provider(module, user2, identifier,
898
                                     **provider_params)
899
        provider.add_to_user()
900
        provider = auth.get_provider(module, user, identifier,
901
                                     **provider_params)
902
        provider.add_to_user()
903
        user.email_verified = True
904
        user.save()
905
        self.assertRaises(Exception, provider.add_to_user)
906
        provider = user.get_auth_provider(module, identifier)
907
        self.assertEqual(user.get_auth_provider(
908
            module, identifier)._instance.info.get('age'), 27)
909

    
910
        module = 'local'
911
        identifier = None
912
        provider_params = {'auth_backend': 'ldap', 'info':
913
                          {'office': 'A1'}}
914
        provider = auth.get_provider(module, user, identifier,
915
                                     **provider_params)
916
        provider.add_to_user()
917
        self.assertFalse(provider.get_add_policy)
918
        self.assertRaises(Exception, provider.add_to_user)
919

    
920
        shib = user.get_auth_provider('shibboleth',
921
                                      'SHIB_UUID')
922
        self.assertTrue(shib.get_remove_policy)
923

    
924
        local = user.get_auth_provider('local')
925
        self.assertTrue(local.get_remove_policy)
926

    
927
        local.remove_from_user()
928
        self.assertFalse(shib.get_remove_policy)
929
        self.assertRaises(Exception, shib.remove_from_user)
930

    
931
        provider = user.get_auth_providers()[0]
932
        self.assertRaises(Exception, provider.add_to_user)
933

    
934
    @im_settings(IM_MODULES=['local', 'shibboleth'])
935
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
936
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
937
                                                 'group2'])
938
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
939
                        CREATION_GROUPS_POLICY=['localgroup-create',
940
                                                'group-create'])
941
    def test_add_groups(self):
942
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
943
        provider = auth.get_provider('shibboleth', user, 'test123')
944
        provider.add_to_user()
945
        user = AstakosUser.objects.get()
946
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
947
                              sorted([u'group1', u'group2', u'group-create']))
948

    
949
        local = auth.get_provider('local', user)
950
        local.add_to_user()
951
        provider = user.get_auth_provider('shibboleth')
952
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
953
        provider.remove_from_user()
954
        user = AstakosUser.objects.get()
955
        self.assertEqual(len(user.get_auth_providers()), 1)
956
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
957
                              sorted([u'group-create', u'localgroup']))
958

    
959
        local = user.get_auth_provider('local')
960
        self.assertRaises(Exception, local.remove_from_user)
961
        provider = auth.get_provider('shibboleth', user, 'test123')
962
        provider.add_to_user()
963
        user = AstakosUser.objects.get()
964
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
965
                              sorted([u'group-create', u'group1', u'group2',
966
                               u'localgroup']))
967
        Group.objects.all().delete()
968

    
969

    
970

    
971
    @im_settings(IM_MODULES=['local', 'shibboleth'])
972
    def test_policies(self):
973
        group_old, created = Group.objects.get_or_create(name='olduser')
974

    
975
        astakos_settings.MODERATION_ENABLED = True
976
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
977
            ['academic-user']
978
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
979
            ['google-user']
980

    
981
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
982
        user.groups.add(group_old)
983
        user.add_auth_provider('local')
984

    
985
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
986
        user2.add_auth_provider('shibboleth', identifier='shibid')
987

    
988
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
989
        user3.groups.add(group_old)
990
        user3.add_auth_provider('local')
991
        user3.add_auth_provider('shibboleth', identifier='1234')
992

    
993
        self.assertTrue(user2.groups.get(name='academic-user'))
994
        self.assertFalse(user2.groups.filter(name='olduser').count())
995

    
996
        local = auth_providers.get_provider('local')
997
        self.assertTrue(local.get_add_policy)
998

    
999
        academic_group = Group.objects.get(name='academic-user')
1000
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1001
                                                     academic_group,
1002
                                                     exclusive=True,
1003
                                                     add=False,
1004
                                                     login=False)
1005
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1006
                                                     academic_group,
1007
                                                     exclusive=True,
1008
                                                     login=False,
1009
                                                     add=False)
1010
        # no duplicate entry gets created
1011
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1012

    
1013
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1014
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1015
                                                     user2,
1016
                                                     remove=False)
1017
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1018

    
1019
        local = auth_providers.get_provider('local', user2)
1020
        google = auth_providers.get_provider('google', user2)
1021
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1022
        self.assertTrue(shibboleth.get_login_policy)
1023
        self.assertFalse(shibboleth.get_remove_policy)
1024
        self.assertFalse(local.get_add_policy)
1025
        self.assertFalse(local.get_add_policy)
1026
        self.assertFalse(google.get_add_policy)
1027

    
1028
        user2.groups.remove(Group.objects.get(name='academic-user'))
1029
        self.assertTrue(local.get_add_policy)
1030
        self.assertTrue(google.get_add_policy)
1031
        user2.groups.add(Group.objects.get(name='academic-user'))
1032

    
1033
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1034
                                                     user2,
1035
                                                     exclusive=True,
1036
                                                     add=True)
1037
        self.assertTrue(local.get_add_policy)
1038
        self.assertTrue(google.get_add_policy)
1039

    
1040
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1041
        self.assertFalse(local.get_automoderate_policy)
1042
        self.assertFalse(google.get_automoderate_policy)
1043
        self.assertTrue(shibboleth.get_automoderate_policy)
1044

    
1045
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1046
                  'GOOGLE_ADD_GROUPS_POLICY']:
1047
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1048

    
1049

    
1050
    @shibboleth_settings(CREATE_POLICY=True)
1051
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1052
    def test_create_http(self):
1053
        # this should be wrapped inside a transaction
1054
        user = AstakosUser(email="test@test.com")
1055
        user.save()
1056
        provider = auth_providers.get_provider('shibboleth', user,
1057
                                               'test@academia.test')
1058
        provider.add_to_user()
1059
        user.get_auth_provider('shibboleth', 'test@academia.test')
1060
        provider = auth_providers.get_provider('local', user)
1061
        provider.add_to_user()
1062
        user.get_auth_provider('local')
1063

    
1064
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1065
        user = AstakosUser(email="test2@test.com")
1066
        user.save()
1067
        provider = auth_providers.get_provider('shibboleth', user,
1068
                                               'test@shibboleth.com',
1069
                                               **{'info': {'name':
1070
                                                                'User Test'}})
1071
        self.assertFalse(provider.get_create_policy)
1072
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1073
        self.assertTrue(provider.get_create_policy)
1074
        academic = provider.add_to_user()
1075

    
1076
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1077
    @shibboleth_settings(LIMIT_POLICY=2)
1078
    def test_policies(self):
1079
        user = get_local_user('kpap@synnefo.org')
1080
        user.add_auth_provider('shibboleth', identifier='1234')
1081
        user.add_auth_provider('shibboleth', identifier='12345')
1082

    
1083
        # default limit is 1
1084
        local = user.get_auth_provider('local')
1085
        self.assertEqual(local.get_add_policy, False)
1086

    
1087
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1088
        academic = user.get_auth_provider('shibboleth',
1089
                                          identifier='1234')
1090
        self.assertEqual(academic.get_add_policy, False)
1091
        newacademic = auth_providers.get_provider('shibboleth', user,
1092
                                                  identifier='123456')
1093
        self.assertEqual(newacademic.get_add_policy, True)
1094
        user.add_auth_provider('shibboleth', identifier='123456')
1095
        self.assertEqual(academic.get_add_policy, False)
1096
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1097

    
1098
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1099
    @shibboleth_settings(LIMIT_POLICY=2)
1100
    def test_messages(self):
1101
        user = get_local_user('kpap@synnefo.org')
1102
        user.add_auth_provider('shibboleth', identifier='1234')
1103
        user.add_auth_provider('shibboleth', identifier='12345')
1104
        provider = auth_providers.get_provider('shibboleth')
1105
        self.assertEqual(provider.get_message('title'), 'Academic')
1106
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1107
        # regenerate messages cache
1108
        provider = auth_providers.get_provider('shibboleth')
1109
        self.assertEqual(provider.get_message('title'), 'New title')
1110
        self.assertEqual(provider.get_message('login_title'),
1111
                         'New title LOGIN')
1112
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1113
        self.assertEqual(provider.get_module_icon,
1114
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1115
        self.assertEqual(provider.get_module_medium_icon,
1116
                         settings.MEDIA_URL +
1117
                         'im/auth/icons-medium/shibboleth.png')
1118

    
1119
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1120
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1121
        self.assertEqual(provider.get_method_details_msg,
1122
                         'Account: 12345')
1123
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1124
        self.assertEqual(provider.get_method_details_msg,
1125
                         'Account: 1234')
1126

    
1127
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1128
        self.assertEqual(provider.get_not_active_msg,
1129
                         "'Academic login' is disabled.")
1130

    
1131
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1132
    @shibboleth_settings(LIMIT_POLICY=2)
1133
    def test_templates(self):
1134
        user = get_local_user('kpap@synnefo.org')
1135
        user.add_auth_provider('shibboleth', identifier='1234')
1136
        user.add_auth_provider('shibboleth', identifier='12345')
1137

    
1138
        provider = auth_providers.get_provider('shibboleth')
1139
        self.assertEqual(provider.get_template('login'),
1140
                         'im/auth/shibboleth_login.html')
1141
        provider = auth_providers.get_provider('google')
1142
        self.assertEqual(provider.get_template('login'),
1143
                         'im/auth/generic_login.html')
1144

    
1145

    
1146
class TestActivationBackend(TestCase):
1147

    
1148
    def setUp(self):
1149
        # dummy call to pass through logging middleware
1150
        self.client.get(ui_url(''))
1151

    
1152
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1153
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1154
    def test_policies(self):
1155
        backend = activation_backends.get_backend()
1156

    
1157
        # email matches RE_USER_EMAIL_PATTERNS
1158
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1159
                               is_active=False, email_verified=False)
1160
        backend.handle_verification(user1, user1.verification_code)
1161
        self.assertEqual(user1.accepted_policy, 'email')
1162

    
1163
        # manually moderated
1164
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1165
                               is_active=False, email_verified=False)
1166

    
1167
        backend.handle_verification(user2, user2.verification_code)
1168
        self.assertEqual(user2.moderated, False)
1169
        backend.handle_moderation(user2)
1170
        self.assertEqual(user2.moderated, True)
1171
        self.assertEqual(user2.accepted_policy, 'manual')
1172

    
1173
        # autoaccept due to provider automoderate policy
1174
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1175
                               is_active=False, email_verified=False)
1176
        user3.auth_providers.all().delete()
1177
        user3.add_auth_provider('shibboleth', identifier='shib123')
1178
        backend.handle_verification(user3, user3.verification_code)
1179
        self.assertEqual(user3.moderated, True)
1180
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1181

    
1182
    @im_settings(MODERATION_ENABLED=False,
1183
                 MANAGERS=(('Manager',
1184
                            'manager@synnefo.org'),),
1185
                 HELPDESK=(('Helpdesk',
1186
                            'helpdesk@synnefo.org'),),
1187
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1188
    def test_without_moderation(self):
1189
        backend = activation_backends.get_backend()
1190
        form = backend.get_signup_form('local')
1191
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1192

    
1193
        user_data = {
1194
            'email': 'kpap@synnefo.org',
1195
            'first_name': 'Kostas Papas',
1196
            'password1': '123',
1197
            'password2': '123'
1198
        }
1199
        form = backend.get_signup_form('local', user_data)
1200
        user = form.save(commit=False)
1201
        form.store_user(user)
1202
        self.assertEqual(user.is_active, False)
1203
        self.assertEqual(user.email_verified, False)
1204

    
1205
        # step one, registration
1206
        result = backend.handle_registration(user)
1207
        user = AstakosUser.objects.get()
1208
        self.assertEqual(user.is_active, False)
1209
        self.assertEqual(user.email_verified, False)
1210
        self.assertTrue(user.verification_code)
1211
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1212
        backend.send_result_notifications(result, user)
1213
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1214
        self.assertEqual(len(mail.outbox), 1)
1215

    
1216
        # step two, verify email (automatically
1217
        # moderates/accepts user, since moderation is disabled)
1218
        user = AstakosUser.objects.get()
1219
        valid_code = user.verification_code
1220

    
1221
        # test invalid code
1222
        result = backend.handle_verification(user, valid_code)
1223
        backend.send_result_notifications(result, user)
1224
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1225
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1226
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1227
        # verification + activated + greeting = 3
1228
        self.assertEqual(len(mail.outbox), 3)
1229
        user = AstakosUser.objects.get()
1230
        self.assertEqual(user.is_active, True)
1231
        self.assertEqual(user.moderated, True)
1232
        self.assertTrue(user.moderated_at)
1233
        self.assertEqual(user.email_verified, True)
1234
        self.assertTrue(user.activation_sent)
1235

    
1236
    @im_settings(MODERATION_ENABLED=True,
1237
                 MANAGERS=(('Manager',
1238
                            'manager@synnefo.org'),),
1239
                 HELPDESK=(('Helpdesk',
1240
                            'helpdesk@synnefo.org'),),
1241
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1242
    def test_with_moderation(self):
1243

    
1244
        backend = activation_backends.get_backend()
1245
        form = backend.get_signup_form('local')
1246
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1247

    
1248
        user_data = {
1249
            'email': 'kpap@synnefo.org',
1250
            'first_name': 'Kostas Papas',
1251
            'password1': '123',
1252
            'password2': '123'
1253
        }
1254
        form = backend.get_signup_form(provider='local',
1255
                                       initial_data=user_data)
1256
        user = form.save(commit=False)
1257
        form.store_user(user)
1258
        self.assertEqual(user.is_active, False)
1259
        self.assertEqual(user.email_verified, False)
1260

    
1261
        # step one, registration
1262
        result = backend.handle_registration(user)
1263
        user = AstakosUser.objects.get()
1264
        self.assertEqual(user.is_active, False)
1265
        self.assertEqual(user.email_verified, False)
1266
        self.assertTrue(user.verification_code)
1267
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1268
        backend.send_result_notifications(result, user)
1269
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1270
        self.assertEqual(len(mail.outbox), 1)
1271

    
1272
        # step two, verifying email
1273
        user = AstakosUser.objects.get()
1274
        valid_code = user.verification_code
1275
        invalid_code = user.verification_code + 'invalid'
1276

    
1277
        # test invalid code
1278
        result = backend.handle_verification(user, invalid_code)
1279
        self.assertEqual(result.status, backend.Result.ERROR)
1280
        backend.send_result_notifications(result, user)
1281
        user = AstakosUser.objects.get()
1282
        self.assertEqual(user.is_active, False)
1283
        self.assertEqual(user.moderated, False)
1284
        self.assertEqual(user.moderated_at, None)
1285
        self.assertEqual(user.email_verified, False)
1286
        self.assertTrue(user.activation_sent)
1287

    
1288
        # test valid code
1289
        user = AstakosUser.objects.get()
1290
        result = backend.handle_verification(user, valid_code)
1291
        backend.send_result_notifications(result, user)
1292
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1293
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1294
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1295
        self.assertEqual(len(mail.outbox), 2)
1296
        user = AstakosUser.objects.get()
1297
        self.assertEqual(user.moderated, False)
1298
        self.assertEqual(user.moderated_at, None)
1299
        self.assertEqual(user.email_verified, True)
1300
        self.assertTrue(user.activation_sent)
1301

    
1302
        # test code reuse
1303
        result = backend.handle_verification(user, valid_code)
1304
        self.assertEqual(result.status, backend.Result.ERROR)
1305
        user = AstakosUser.objects.get()
1306
        self.assertEqual(user.is_active, False)
1307
        self.assertEqual(user.moderated, False)
1308
        self.assertEqual(user.moderated_at, None)
1309
        self.assertEqual(user.email_verified, True)
1310
        self.assertTrue(user.activation_sent)
1311

    
1312
        # valid code on verified user
1313
        user = AstakosUser.objects.get()
1314
        valid_code = user.verification_code
1315
        result = backend.handle_verification(user, valid_code)
1316
        self.assertEqual(result.status, backend.Result.ERROR)
1317

    
1318
        # step three, moderation user
1319
        user = AstakosUser.objects.get()
1320
        result = backend.handle_moderation(user)
1321
        backend.send_result_notifications(result, user)
1322

    
1323
        user = AstakosUser.objects.get()
1324
        self.assertEqual(user.is_active, True)
1325
        self.assertEqual(user.moderated, True)
1326
        self.assertTrue(user.moderated_at)
1327
        self.assertEqual(user.email_verified, True)
1328
        self.assertTrue(user.activation_sent)
1329

    
1330

    
1331
class TestWebloginRedirect(TestCase):
1332

    
1333
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1334
    def test_restricts_domains(self):
1335
        get_local_user('user1@synnefo.org')
1336

    
1337
        # next url construct helpers
1338
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1339
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1340
            urllib.quote_plus(nxt)
1341

    
1342
        # common cases
1343
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1344
        invalid_scheme = weblogin("customscheme://localhost")
1345
        invalid_scheme_with_valid_domain = \
1346
                weblogin("http://www.invaliddomain.com")
1347
        valid_scheme = weblogin("pithos://localhost/")
1348
        # to be used in assertRedirects
1349
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1350

    
1351
        # not authenticated, redirects to login which contains next param with
1352
        # additional nested quoted next params
1353
        r = self.client.get(valid_scheme, follow=True)
1354
        login_redirect = reverse('login') + '?next=' + \
1355
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1356
        self.assertRedirects(r, login_redirect)
1357

    
1358
        # authenticate client
1359
        self.client.login(username="user1@synnefo.org", password="password")
1360

    
1361
        # valid scheme
1362
        r = self.client.get(valid_scheme, follow=True)
1363
        url = r.redirect_chain[1][0]
1364
        # scheme preserved
1365
        self.assertTrue(url.startswith('pithos://localhost/'))
1366
        # redirect contains token param
1367
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1368
                                   scheme='https').query
1369
        params = urlparse.parse_qs(params)
1370
        self.assertEqual(params['token'][0],
1371
                         AstakosUser.objects.get().auth_token)
1372
        # does not contain uuid
1373
        # reverted for 0.14.2 to support old pithos desktop clients
1374
        #self.assertFalse('uuid' in params)
1375

    
1376
        # invalid cases
1377
        r = self.client.get(invalid_scheme, follow=True)
1378
        self.assertEqual(r.status_code, 403)
1379

    
1380
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1381
        self.assertEqual(r.status_code, 403)
1382

    
1383
        r = self.client.get(invalid_domain, follow=True)
1384
        self.assertEqual(r.status_code, 403)