Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 5dc53e99

History | View | Annotate | Download (60.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urlparse
35
import urllib
36

    
37
from astakos.im.tests.common import *
38

    
39
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
40

    
41

    
42
class ShibbolethTests(TestCase):
43
    """
44
    Testing shibboleth authentication.
45
    """
46

    
47
    def setUp(self):
48
        self.client = ShibbolethClient()
49
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
50
        astakos_settings.MODERATION_ENABLED = True
51

    
52
    def tearDown(self):
53
        AstakosUser.objects.all().delete()
54

    
55
    @im_settings(FORCE_PROFILE_UPDATE=False)
56
    def test_create_account(self):
57

    
58
        client = ShibbolethClient()
59

    
60
        # shibboleth views validation
61
        # eepn required
62
        r = client.get(ui_url('login/shibboleth?'), follow=True)
63
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
64
            'domain': astakos_settings.BASE_HOST,
65
            'contact_email': settings.CONTACT_EMAIL
66
        })
67
        client.set_tokens(eppn="kpapeppn")
68

    
69
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
70
        # shibboleth user info required
71
        r = client.get(ui_url('login/shibboleth?'), follow=True)
72
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
73
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
74

    
75
        # shibboleth logged us in
76
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
77
                          cn="Kostas Papadimitriou",
78
                          ep_affiliation="Test Affiliation")
79
        r = client.get(ui_url('login/shibboleth?'), follow=True)
80
        token = PendingThirdPartyUser.objects.get().token
81
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
82
        self.assertEqual(r.status_code, 200)
83

    
84
        # a new pending user created
85
        pending_user = PendingThirdPartyUser.objects.get(
86
            third_party_identifier="kpapeppn")
87
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
88
        # keep the token for future use
89
        token = pending_user.token
90
        # from now on no shibboleth headers are sent to the server
91
        client.reset_tokens()
92

    
93
        # this is the old way, it should fail, to avoid pending user take over
94
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
95
        self.assertEqual(r.status_code, 404)
96

    
97
        # this is the signup unique url associated with the pending user
98
        # created
99
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
100
        identifier = pending_user.third_party_identifier
101
        post_data = {'third_party_identifier': identifier,
102
                     'first_name': 'Kostas',
103
                     'third_party_token': token,
104
                     'last_name': 'Mitroglou',
105
                     'provider': 'shibboleth'}
106

    
107
        signup_url = reverse('signup')
108

    
109
        # invlid email
110
        post_data['email'] = 'kpap'
111
        r = client.post(signup_url, post_data)
112
        self.assertContains(r, token)
113

    
114
        # existing email
115
        existing_user = get_local_user('test@test.com')
116
        post_data['email'] = 'test@test.com'
117
        r = client.post(signup_url, post_data)
118
        self.assertContains(r, messages.EMAIL_USED)
119
        existing_user.delete()
120

    
121
        # and finally a valid signup
122
        post_data['email'] = 'kpap@synnefo.org'
123
        r = client.post(signup_url, post_data, follow=True)
124
        self.assertContains(r, messages.VERIFICATION_SENT)
125

    
126
        # entires commited as expected
127
        self.assertEqual(AstakosUser.objects.count(), 1)
128
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
129
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
130

    
131
        # provider info stored
132
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
133
        self.assertEqual(provider.affiliation, 'Test Affiliation')
134
        self.assertEqual(provider.info['email'], u'kpap@synnefo.org')
135
        self.assertEqual(provider.info['eppn'], u'kpapeppn')
136
        self.assertEqual(provider.info['name'], u'Kostas Papadimitriou')
137
        self.assertTrue('headers' in provider.info)
138

    
139
        # login (not activated yet)
140
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
141
                          cn="Kostas Papadimitriou")
142
        r = client.get(ui_url("login/shibboleth?"), follow=True)
143
        self.assertContains(r, 'is pending moderation')
144

    
145
        # admin activates the user
146
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
147
        backend = activation_backends.get_backend()
148
        activation_result = backend.verify_user(u, u.verification_code)
149
        activation_result = backend.accept_user(u)
150
        self.assertFalse(activation_result.is_error())
151
        backend.send_result_notifications(activation_result, u)
152
        self.assertEqual(u.is_active, True)
153

    
154
        # we see our profile
155
        r = client.get(ui_url("login/shibboleth?"), follow=True)
156
        self.assertRedirects(r, ui_url('landing'))
157
        self.assertEqual(r.status_code, 200)
158

    
159
    def test_existing(self):
160
        """
161
        Test adding of third party login to an existing account
162
        """
163

    
164
        # this is our existing user
165
        existing_user = get_local_user('kpap@synnefo.org')
166
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
167
        existing_inactive.is_active = False
168
        existing_inactive.save()
169

    
170
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
171
        existing_unverified.is_active = False
172
        existing_unverified.activation_sent = None
173
        existing_unverified.email_verified = False
174
        existing_unverified.is_verified = False
175
        existing_unverified.save()
176

    
177
        client = ShibbolethClient()
178
        # shibboleth logged us in, notice that we use different email
179
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
180
                          cn="Kostas Papadimitriou", )
181
        r = client.get(ui_url("login/shibboleth?"), follow=True)
182

    
183
        # a new pending user created
184
        pending_user = PendingThirdPartyUser.objects.get()
185
        token = pending_user.token
186
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
187
        pending_key = pending_user.token
188
        client.reset_tokens()
189
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
190

    
191
        form = r.context['login_form']
192
        signupdata = copy.copy(form.initial)
193
        signupdata['email'] = 'kpap@synnefo.org'
194
        signupdata['third_party_token'] = token
195
        signupdata['provider'] = 'shibboleth'
196
        signupdata.pop('id', None)
197

    
198
        # the email exists to another user
199
        r = client.post(ui_url("signup"), signupdata)
200
        self.assertContains(r, "There is already an account with this email "
201
                               "address")
202
        # change the case, still cannot create
203
        signupdata['email'] = 'KPAP@synnefo.org'
204
        r = client.post(ui_url("signup"), signupdata)
205
        self.assertContains(r, "There is already an account with this email "
206
                               "address")
207
        # inactive user
208
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
209
        r = client.post(ui_url("signup"), signupdata)
210
        self.assertContains(r, "There is already an account with this email "
211
                               "address")
212

    
213
        # unverified user, this should pass, old entry will be deleted
214
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
215
        r = client.post(ui_url("signup"), signupdata)
216

    
217
        post_data = {'password': 'password',
218
                     'username': 'kpap@synnefo.org'}
219
        r = client.post(ui_url('local'), post_data, follow=True)
220
        self.assertTrue(r.context['request'].user.is_authenticated())
221
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
222
                          cn="Kostas Papadimitriou", )
223
        r = client.get(ui_url("login/shibboleth?"), follow=True)
224
        self.assertContains(r, "enabled for this account")
225
        client.reset_tokens()
226

    
227
        user = existing_user
228
        self.assertTrue(user.has_auth_provider('shibboleth'))
229
        self.assertTrue(user.has_auth_provider('local',
230
                                               auth_backend='astakos'))
231
        client.logout()
232

    
233
        # look Ma, i can login with both my shibboleth and local account
234
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
235
                          cn="Kostas Papadimitriou")
236
        r = client.get(ui_url("login/shibboleth?"), follow=True)
237
        self.assertTrue(r.context['request'].user.is_authenticated())
238
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
239
        self.assertRedirects(r, ui_url('landing'))
240
        self.assertEqual(r.status_code, 200)
241
        client.logout()
242
        client.reset_tokens()
243

    
244
        # logged out
245
        r = client.get(ui_url("profile"), follow=True)
246
        self.assertFalse(r.context['request'].user.is_authenticated())
247

    
248
        # login with local account also works
249
        post_data = {'password': 'password',
250
                     'username': 'kpap@synnefo.org'}
251
        r = self.client.post(ui_url('local'), post_data, follow=True)
252
        self.assertTrue(r.context['request'].user.is_authenticated())
253
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
254
        self.assertRedirects(r, ui_url('landing'))
255
        self.assertEqual(r.status_code, 200)
256

    
257
        # cannot add the same eppn
258
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
259
                          cn="Kostas Papadimitriou", )
260
        r = client.get(ui_url("login/shibboleth?"), follow=True)
261
        self.assertRedirects(r, ui_url('landing'))
262
        self.assertTrue(r.status_code, 200)
263
        self.assertEquals(existing_user.auth_providers.count(), 2)
264

    
265
        # only one allowed by default
266
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
267
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
268
        prov = auth_providers.get_provider('shibboleth')
269
        r = client.get(ui_url("login/shibboleth?"), follow=True)
270
        self.assertContains(r, "Failed to add")
271
        self.assertRedirects(r, ui_url('profile'))
272
        self.assertTrue(r.status_code, 200)
273
        self.assertEquals(existing_user.auth_providers.count(), 2)
274
        client.logout()
275
        client.reset_tokens()
276

    
277
        # cannot login with another eppn
278
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
279
                          cn="Kostas Papadimitriou")
280
        r = client.get(ui_url("login/shibboleth?"), follow=True)
281
        self.assertFalse(r.context['request'].user.is_authenticated())
282

    
283
        # cannot
284

    
285
        # lets remove local password
286
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
287
                                       email="kpap@synnefo.org")
288
        remove_local_url = user.get_auth_provider('local').get_remove_url
289
        remove_shibbo_url = user.get_auth_provider('shibboleth',
290
                                                   'kpapeppn').get_remove_url
291
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
292
                          cn="Kostas Papadimtriou")
293
        r = client.get(ui_url("login/shibboleth?"), follow=True)
294
        client.reset_tokens()
295

    
296
        # only POST is allowed (for CSRF protection)
297
        r = client.get(remove_local_url, follow=True)
298
        self.assertEqual(r.status_code, 405)
299

    
300
        r = client.post(remove_local_url, follow=True)
301
        # 2 providers left
302
        self.assertEqual(user.auth_providers.count(), 1)
303
        # cannot remove last provider
304
        r = client.post(remove_shibbo_url)
305
        self.assertEqual(r.status_code, 403)
306
        self.client.logout()
307

    
308
        # cannot login using local credentials (notice we use another client)
309
        post_data = {'password': 'password',
310
                     'username': 'kpap@synnefo.org'}
311
        r = self.client.post(ui_url('local'), post_data, follow=True)
312
        self.assertFalse(r.context['request'].user.is_authenticated())
313

    
314
        # we can reenable the local provider by setting a password
315
        r = client.get(ui_url("password_change"), follow=True)
316
        r = client.post(ui_url("password_change"), {'new_password1': '111',
317
                                                'new_password2': '111'},
318
                        follow=True)
319
        user = r.context['request'].user
320
        self.assertTrue(user.has_auth_provider('local'))
321
        self.assertTrue(user.has_auth_provider('shibboleth'))
322
        self.assertTrue(user.check_password('111'))
323
        self.assertTrue(user.has_usable_password())
324

    
325
        # change password via profile form
326
        r = client.post(ui_url("profile"), {
327
            'old_password': '111',
328
            'new_password': '',
329
            'new_password2': '',
330
            'change_password': 'on',
331
        }, follow=False)
332
        self.assertEqual(r.status_code, 200)
333
        self.assertFalse(r.context['profile_form'].is_valid())
334

    
335
        self.client.logout()
336

    
337
        # now we can login
338
        post_data = {'password': '111',
339
                     'username': 'kpap@synnefo.org'}
340
        r = self.client.post(ui_url('local'), post_data, follow=True)
341
        self.assertTrue(r.context['request'].user.is_authenticated())
342

    
343
        client.reset_tokens()
344

    
345
        # we cannot take over another shibboleth identifier
346
        user2 = get_local_user('another@synnefo.org')
347
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
348
        # login
349
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
350
                          cn="Kostas Papadimitriou")
351
        r = client.get(ui_url("login/shibboleth?"), follow=True)
352
        # try to assign existing shibboleth identifier of another user
353
        client.set_tokens(mail="kpap_second@shibboleth.gr",
354
                          eppn="existingeppn", cn="Kostas Papadimitriou")
355
        r = client.get(ui_url("login/shibboleth?"), follow=True)
356
        self.assertContains(r, "is already in use")
357

    
358

    
359
class TestLocal(TestCase):
360

    
361
    def setUp(self):
362
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
363
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
364
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
365
        settings.ASTAKOS_MODERATION_ENABLED = True
366

    
367
    def tearDown(self):
368
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
369
        AstakosUser.objects.all().delete()
370

    
371
    def test_no_moderation(self):
372
        # disable moderation
373
        astakos_settings.MODERATION_ENABLED = False
374

    
375
        # create a new user
376
        r = self.client.get(ui_url("signup"))
377
        self.assertEqual(r.status_code, 200)
378
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
379
                'password2': 'password', 'first_name': 'Kostas',
380
                'last_name': 'Mitroglou', 'provider': 'local'}
381
        r = self.client.post(ui_url("signup"), data)
382

    
383
        # user created
384
        self.assertEqual(AstakosUser.objects.count(), 1)
385
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
386
                                       email="kpap@synnefo.org")
387
        self.assertEqual(user.username, 'kpap@synnefo.org')
388
        self.assertEqual(user.has_auth_provider('local'), True)
389
        self.assertFalse(user.is_active)
390

    
391
        # user (but not admin) gets notified
392
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
393
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
394
        astakos_settings.MODERATION_ENABLED = True
395

    
396
    def test_email_case(self):
397
        data = {
398
            'email': 'kPap@synnefo.org',
399
            'password1': '1234',
400
            'password2': '1234'
401
        }
402

    
403
        form = forms.LocalUserCreationForm(data)
404
        self.assertTrue(form.is_valid())
405
        user = form.save()
406
        form.store_user(user, {})
407

    
408
        u = AstakosUser.objects.get()
409
        self.assertEqual(u.email, 'kPap@synnefo.org')
410
        self.assertEqual(u.username, 'kpap@synnefo.org')
411
        u.is_active = True
412
        u.email_verified = True
413
        u.save()
414

    
415
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
416
        login = forms.LoginForm(data=data)
417
        self.assertTrue(login.is_valid())
418

    
419
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
420
        login = forms.LoginForm(data=data)
421
        self.assertTrue(login.is_valid())
422

    
423
        data = {
424
            'email': 'kpap@synnefo.org',
425
            'password1': '1234',
426
            'password2': '1234'
427
        }
428
        form = forms.LocalUserCreationForm(data)
429
        self.assertFalse(form.is_valid())
430

    
431
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
432
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
433
    def test_local_provider(self):
434
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
435

    
436
        # create a user
437
        r = self.client.get(ui_url("signup"))
438
        self.assertEqual(r.status_code, 200)
439
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
440
                'password2': 'password', 'first_name': 'Kostas',
441
                'last_name': 'Mitroglou', 'provider': 'local'}
442
        r = self.client.post(ui_url("signup"), data)
443

    
444
        # user created
445
        self.assertEqual(AstakosUser.objects.count(), 1)
446
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
447
                                       email="kpap@synnefo.org")
448
        self.assertEqual(user.username, 'kpap@synnefo.org')
449
        self.assertEqual(user.has_auth_provider('local'), True)
450
        self.assertFalse(user.is_active)  # not activated
451
        self.assertFalse(user.email_verified)  # not verified
452
        self.assertTrue(user.activation_sent)  # activation automatically sent
453
        self.assertFalse(user.moderated)
454
        self.assertFalse(user.email_verified)
455

    
456
        # admin gets notified and activates the user from the command line
457
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
458
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
459
                                           'password': 'password'},
460
                             follow=True)
461
        self.assertContains(r, messages.VERIFICATION_SENT)
462
        backend = activation_backends.get_backend()
463

    
464
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
465
        backend.send_user_verification_email(user)
466

    
467
        # user activation fields updated and user gets notified via email
468
        user = AstakosUser.objects.get(pk=user.pk)
469
        self.assertTrue(user.activation_sent)
470
        self.assertFalse(user.email_verified)
471
        self.assertFalse(user.is_active)
472
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
473

    
474
        # user forgot she got registered and tries to submit registration
475
        # form. Notice the upper case in email
476
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
477
                'password2': 'password', 'first_name': 'Kostas',
478
                'last_name': 'Mitroglou', 'provider': 'local'}
479
        r = self.client.post(ui_url("signup"), data, follow=True)
480
        self.assertRedirects(r, reverse('login'))
481
        self.assertContains(r, messages.VERIFICATION_SENT)
482

    
483
        user = AstakosUser.objects.get()
484
        # previous user replaced
485
        self.assertTrue(user.activation_sent)
486
        self.assertFalse(user.email_verified)
487
        self.assertFalse(user.is_active)
488
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
489

    
490
        # hmmm, email exists; lets request a password change
491
        r = self.client.get(ui_url('local/password_reset'))
492
        self.assertEqual(r.status_code, 200)
493
        data = {'email': 'kpap@synnefo.org'}
494
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
495
        # she can't because account is not active yet
496
        self.assertContains(r, 'pending activation')
497

    
498
        # moderation is enabled and an activation email has already been sent
499
        # so user can trigger resend of the activation email
500
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
501
                            follow=True)
502
        self.assertContains(r, 'has been sent to your email address.')
503
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
504

    
505
        # also she cannot login
506
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
507
        r = self.client.post(ui_url('local'), data, follow=True)
508
        self.assertContains(r, 'Resend activation')
509
        self.assertFalse(r.context['request'].user.is_authenticated())
510
        self.assertFalse('_pithos2_a' in self.client.cookies)
511

    
512
        # user sees the message and resends activation
513
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
514
                            follow=True)
515
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
516

    
517
        # logged in user cannot activate another account
518
        tmp_user = get_local_user("test_existing_user@synnefo.org")
519
        tmp_client = Client()
520
        tmp_client.login(username="test_existing_user@synnefo.org",
521
                         password="password")
522
        r = tmp_client.get(user.get_activation_url(), follow=True)
523
        self.assertContains(r, messages.LOGGED_IN_WARNING)
524

    
525
        r = self.client.get(user.get_activation_url(), follow=True)
526
        # previous code got invalidated
527
        self.assertEqual(r.status_code, 404)
528

    
529
        user = AstakosUser.objects.get(pk=user.pk)
530
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
531
        r = self.client.get(user.get_activation_url(), follow=True)
532
        self.assertRedirects(r, reverse('login'))
533
        # user sees that account is pending approval from admins
534
        self.assertContains(r, messages.NOTIFICATION_SENT)
535
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
536

    
537
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
538
        result = backend.handle_moderation(user)
539
        backend.send_result_notifications(result, user)
540
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
541
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
542

    
543
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
544
        r = self.client.get(ui_url('profile'), follow=True)
545
        self.assertFalse(r.context['request'].user.is_authenticated())
546
        self.assertFalse('_pithos2_a' in self.client.cookies)
547
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
548

    
549
        user = AstakosUser.objects.get(pk=user.pk)
550
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
551
                                               'password': 'password'},
552
                             follow=True)
553
        # user activated and logged in, token cookie set
554
        self.assertTrue(r.context['request'].user.is_authenticated())
555
        self.assertTrue('_pithos2_a' in self.client.cookies)
556
        cookies = self.client.cookies
557
        self.assertTrue(quote(user.auth_token) in
558
                        cookies.get('_pithos2_a').value)
559
        r = self.client.get(ui_url('logout'), follow=True)
560
        r = self.client.get(ui_url(''), follow=True)
561
        self.assertRedirects(r, ui_url('login'))
562
        # user logged out, token cookie removed
563
        self.assertFalse(r.context['request'].user.is_authenticated())
564
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
565

    
566
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
567
        del self.client.cookies['_pithos2_a']
568

    
569
        # user can login
570
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
571
                                               'password': 'password'},
572
                             follow=True)
573
        self.assertTrue(r.context['request'].user.is_authenticated())
574
        self.assertTrue('_pithos2_a' in self.client.cookies)
575
        cookies = self.client.cookies
576
        self.assertTrue(quote(user.auth_token) in
577
                        cookies.get('_pithos2_a').value)
578
        self.client.get(ui_url('logout'), follow=True)
579

    
580
        # user forgot password
581
        old_pass = user.password
582
        r = self.client.get(ui_url('local/password_reset'))
583
        self.assertEqual(r.status_code, 200)
584
        r = self.client.post(ui_url('local/password_reset'),
585
                             {'email': 'kpap@synnefo.org'})
586
        self.assertEqual(r.status_code, 302)
587
        # email sent
588
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
589

    
590
        # user visits change password link
591
        user = AstakosUser.objects.get(pk=user.pk)
592
        r = self.client.get(user.get_password_reset_url())
593
        r = self.client.post(user.get_password_reset_url(),
594
                             {'new_password1': 'newpass',
595
                              'new_password2': 'newpass'})
596

    
597
        user = AstakosUser.objects.get(pk=user.pk)
598
        self.assertNotEqual(old_pass, user.password)
599

    
600
        # old pass is not usable
601
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
602
                                               'password': 'password'})
603
        self.assertContains(r, 'Please enter a correct username and password')
604
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
605
                                               'password': 'newpass'},
606
                             follow=True)
607
        self.assertTrue(r.context['request'].user.is_authenticated())
608
        self.client.logout()
609

    
610
        # tests of special local backends
611
        user = AstakosUser.objects.get(pk=user.pk)
612
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
613
        user.save()
614

    
615
        # non astakos local backends do not support password reset
616
        r = self.client.get(ui_url('local/password_reset'))
617
        self.assertEqual(r.status_code, 200)
618
        r = self.client.post(ui_url('local/password_reset'),
619
                             {'email': 'kpap@synnefo.org'})
620
        # she can't because account is not active yet
621
        self.assertContains(r, "Changing password is not")
622

    
623

    
624
class UserActionsTests(TestCase):
625

    
626
    def test_email_change(self):
627
        # to test existing email validation
628
        get_local_user('existing@synnefo.org')
629

    
630
        # local user
631
        user = get_local_user('kpap@synnefo.org')
632

    
633
        # login as kpap
634
        self.client.login(username='kpap@synnefo.org', password='password')
635
        r = self.client.get(ui_url('profile'), follow=True)
636
        user = r.context['request'].user
637
        self.assertTrue(user.is_authenticated())
638

    
639
        # change email is enabled
640
        r = self.client.get(ui_url('email_change'))
641
        self.assertEqual(r.status_code, 200)
642
        self.assertFalse(user.email_change_is_pending())
643

    
644
        # request email change to an existing email fails
645
        data = {'new_email_address': 'existing@synnefo.org'}
646
        r = self.client.post(ui_url('email_change'), data)
647
        self.assertContains(r, messages.EMAIL_USED)
648

    
649
        # proper email change
650
        data = {'new_email_address': 'kpap@gmail.com'}
651
        r = self.client.post(ui_url('email_change'), data, follow=True)
652
        self.assertRedirects(r, ui_url('profile'))
653
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
654
        change1 = EmailChange.objects.get()
655

    
656
        # user sees a warning
657
        r = self.client.get(ui_url('email_change'))
658
        self.assertEqual(r.status_code, 200)
659
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
660
        self.assertTrue(user.email_change_is_pending())
661

    
662
        # link was sent
663
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
664
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
665

    
666
        # proper email change
667
        data = {'new_email_address': 'kpap@yahoo.com'}
668
        r = self.client.post(ui_url('email_change'), data, follow=True)
669
        self.assertRedirects(r, ui_url('profile'))
670
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
671
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
672
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
673
        change2 = EmailChange.objects.get()
674

    
675
        r = self.client.get(change1.get_url())
676
        self.assertEquals(r.status_code, 404)
677
        self.client.logout()
678

    
679
        invalid_client = Client()
680
        r = invalid_client.post(ui_url('local?'),
681
                                {'username': 'existing@synnefo.org',
682
                                 'password': 'password'})
683
        r = invalid_client.get(change2.get_url(), follow=True)
684
        self.assertEquals(r.status_code, 403)
685

    
686
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
687
                             {'username': 'kpap@synnefo.org',
688
                              'password': 'password',
689
                              'next': change2.get_url()},
690
                             follow=True)
691
        self.assertRedirects(r, ui_url('profile'))
692
        user = r.context['request'].user
693
        self.assertEquals(user.email, 'kpap@yahoo.com')
694
        self.assertEquals(user.username, 'kpap@yahoo.com')
695

    
696
        self.client.logout()
697
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
698
                             {'username': 'kpap@synnefo.org',
699
                              'password': 'password',
700
                              'next': change2.get_url()},
701
                             follow=True)
702
        self.assertContains(r, "Please enter a correct username and password")
703
        self.assertEqual(user.emailchanges.count(), 0)
704

    
705
        AstakosUser.objects.all().delete()
706
        Group.objects.all().delete()
707

    
708

    
709
class TestAuthProviderViews(TestCase):
710

    
711
    def tearDown(self):
712
        AstakosUser.objects.all().delete()
713

    
714
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
715
                         AUTOMODERATE_POLICY=True)
716
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
717
                 FORCE_PROFILE_UPDATE=False)
718
    def test_user(self):
719
        Profile = AuthProviderPolicyProfile
720
        Pending = PendingThirdPartyUser
721
        User = AstakosUser
722

    
723
        User.objects.create(email="newuser@synnefo.org")
724
        get_local_user("olduser@synnefo.org")
725
        cl_olduser = ShibbolethClient()
726
        get_local_user("olduser2@synnefo.org")
727
        ShibbolethClient()
728
        cl_newuser = ShibbolethClient()
729
        cl_newuser2 = Client()
730

    
731
        academic_group, created = Group.objects.get_or_create(
732
            name='academic-login')
733
        academic_users = academic_group.user_set
734
        assert created
735
        policy_only_academic = Profile.objects.add_policy('academic_strict',
736
                                                          'shibboleth',
737
                                                          academic_group,
738
                                                          exclusive=True,
739
                                                          login=False,
740
                                                          add=False)
741

    
742
        # new academic user
743
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
744
        cl_newuser.set_tokens(eppn="newusereppn", mail="newuser@synnefo.org",
745
                              surname="Lastname")
746
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
747
        initial = r.context['signup_form'].initial
748
        pending = Pending.objects.get()
749
        self.assertEqual(initial.get('last_name'), 'Lastname')
750
        self.assertEqual(initial.get('email'), 'newuser@synnefo.org')
751
        identifier = pending.third_party_identifier
752
        signup_data = {'third_party_identifier': identifier,
753
                       'first_name': 'Academic',
754
                       'third_party_token': pending.token,
755
                       'last_name': 'New User',
756
                       'provider': 'shibboleth'}
757
        r = cl_newuser.post(ui_url('signup'), signup_data)
758
        self.assertContains(r, "This field is required", )
759
        signup_data['email'] = 'olduser@synnefo.org'
760
        r = cl_newuser.post(ui_url('signup'), signup_data)
761
        self.assertContains(r, "already an account with this email", )
762
        signup_data['email'] = 'newuser@synnefo.org'
763
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
764
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
765
        self.assertEqual(r.status_code, 404)
766
        newuser = User.objects.get(email="newuser@synnefo.org")
767
        activation_link = newuser.get_activation_url()
768
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
769

    
770
        # new non-academic user
771
        signup_data = {'first_name': 'Non Academic',
772
                       'last_name': 'New User',
773
                       'provider': 'local',
774
                       'password1': 'password',
775
                       'password2': 'password'}
776
        signup_data['email'] = 'olduser@synnefo.org'
777
        r = cl_newuser2.post(ui_url('signup'), signup_data)
778
        self.assertContains(r, 'There is already an account with this '
779
                               'email address')
780
        signup_data['email'] = 'newuser@synnefo.org'
781
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
782
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
783
        r = self.client.get(activation_link, follow=True)
784
        self.assertEqual(r.status_code, 404)
785
        newuser = User.objects.get(email="newuser@synnefo.org")
786
        self.assertTrue(newuser.activation_sent)
787

    
788
        # activation sent, user didn't open verification url so additional
789
        # registrations invalidate the previous signups.
790
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
791
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
792
        pending = Pending.objects.get()
793
        identifier = pending.third_party_identifier
794
        signup_data = {'third_party_identifier': identifier,
795
                       'first_name': 'Academic',
796
                       'third_party_token': pending.token,
797
                       'last_name': 'New User',
798
                       'provider': 'shibboleth'}
799
        signup_data['email'] = 'newuser@synnefo.org'
800
        r = cl_newuser.post(ui_url('signup'), signup_data)
801
        self.assertEqual(r.status_code, 302)
802
        newuser = User.objects.get(email="newuser@synnefo.org")
803
        self.assertTrue(newuser.activation_sent)
804
        activation_link = newuser.get_activation_url()
805
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
806
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
807
        self.assertRedirects(r, ui_url('landing'))
808
        newuser = User.objects.get(email="newuser@synnefo.org")
809
        self.assertEqual(newuser.is_active, True)
810
        self.assertEqual(newuser.email_verified, True)
811
        cl_newuser.logout()
812

    
813
        # cannot reactivate if suspended
814
        newuser.is_active = False
815
        newuser.save()
816
        r = cl_newuser.get(newuser.get_activation_url())
817
        newuser = User.objects.get(email="newuser@synnefo.org")
818
        self.assertFalse(newuser.is_active)
819

    
820
        # release suspension
821
        newuser.is_active = True
822
        newuser.save()
823

    
824
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
825
        local = auth.get_provider('local', newuser)
826
        self.assertEqual(local.get_add_policy, False)
827
        self.assertEqual(local.get_login_policy, False)
828
        r = cl_newuser.get(local.get_add_url, follow=True)
829
        self.assertRedirects(r, ui_url('profile'))
830
        self.assertContains(r, 'disabled for your')
831

    
832
        cl_olduser.login(username='olduser@synnefo.org', password="password")
833
        r = cl_olduser.get(ui_url('profile'), follow=True)
834
        self.assertEqual(r.status_code, 200)
835
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
836
        self.assertContains(r, 'Your request is missing a unique token')
837
        cl_olduser.set_tokens(eppn="newusereppn")
838
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
839
        self.assertContains(r, 'already in use')
840
        cl_olduser.set_tokens(eppn="oldusereppn")
841
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
842
        self.assertContains(r, 'Academic login enabled for this account')
843

    
844
        user = User.objects.get(email="olduser@synnefo.org")
845
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
846
        local_provider = user.get_auth_provider('local')
847
        self.assertEqual(shib_provider.get_remove_policy, True)
848
        self.assertEqual(local_provider.get_remove_policy, True)
849

    
850
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
851
                                                          'shibboleth',
852
                                                          academic_group,
853
                                                          remove=False)
854
        user.groups.add(academic_group)
855
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
856
        local_provider = user.get_auth_provider('local')
857
        self.assertEqual(shib_provider.get_remove_policy, False)
858
        self.assertEqual(local_provider.get_remove_policy, True)
859
        self.assertEqual(local_provider.get_login_policy, False)
860

    
861
        cl_olduser.logout()
862
        login_data = {'username': 'olduser@synnefo.org',
863
                      'password': 'password'}
864
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
865
        self.assertContains(r, "login/shibboleth'>Academic login")
866
        Group.objects.all().delete()
867

    
868

    
869
class TestAuthProvidersAPI(TestCase):
870
    """
871
    Test auth_providers module API
872
    """
873

    
874
    def tearDown(self):
875
        Group.objects.all().delete()
876

    
877
    @im_settings(IM_MODULES=['local', 'shibboleth'])
878
    def test_create(self):
879
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
880
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
881

    
882
        module = 'shibboleth'
883
        identifier = 'SHIB_UUID'
884
        provider_params = {
885
            'affiliation': 'UNIVERSITY',
886
            'info': {'age': 27}
887
        }
888
        provider = auth.get_provider(module, user2, identifier,
889
                                     **provider_params)
890
        provider.add_to_user()
891
        provider = auth.get_provider(module, user, identifier,
892
                                     **provider_params)
893
        provider.add_to_user()
894
        user.email_verified = True
895
        user.save()
896
        self.assertRaises(Exception, provider.add_to_user)
897
        provider = user.get_auth_provider(module, identifier)
898
        self.assertEqual(user.get_auth_provider(
899
            module, identifier)._instance.info.get('age'), 27)
900

    
901
        module = 'local'
902
        identifier = None
903
        provider_params = {'auth_backend': 'ldap', 'info':
904
                          {'office': 'A1'}}
905
        provider = auth.get_provider(module, user, identifier,
906
                                     **provider_params)
907
        provider.add_to_user()
908
        self.assertFalse(provider.get_add_policy)
909
        self.assertRaises(Exception, provider.add_to_user)
910

    
911
        shib = user.get_auth_provider('shibboleth',
912
                                      'SHIB_UUID')
913
        self.assertTrue(shib.get_remove_policy)
914

    
915
        local = user.get_auth_provider('local')
916
        self.assertTrue(local.get_remove_policy)
917

    
918
        local.remove_from_user()
919
        self.assertFalse(shib.get_remove_policy)
920
        self.assertRaises(Exception, shib.remove_from_user)
921

    
922
        provider = user.get_auth_providers()[0]
923
        self.assertRaises(Exception, provider.add_to_user)
924

    
925
    @im_settings(IM_MODULES=['local', 'shibboleth'])
926
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
927
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
928
                                                 'group2'])
929
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
930
                        CREATION_GROUPS_POLICY=['localgroup-create',
931
                                                'group-create'])
932
    def test_add_groups(self):
933
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
934
        provider = auth.get_provider('shibboleth', user, 'test123')
935
        provider.add_to_user()
936
        user = AstakosUser.objects.get()
937
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
938
                              sorted([u'group1', u'group2', u'group-create']))
939

    
940
        local = auth.get_provider('local', user)
941
        local.add_to_user()
942
        provider = user.get_auth_provider('shibboleth')
943
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
944
        provider.remove_from_user()
945
        user = AstakosUser.objects.get()
946
        self.assertEqual(len(user.get_auth_providers()), 1)
947
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
948
                              sorted([u'group-create', u'localgroup']))
949

    
950
        local = user.get_auth_provider('local')
951
        self.assertRaises(Exception, local.remove_from_user)
952
        provider = auth.get_provider('shibboleth', user, 'test123')
953
        provider.add_to_user()
954
        user = AstakosUser.objects.get()
955
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
956
                              sorted([u'group-create', u'group1', u'group2',
957
                               u'localgroup']))
958
        Group.objects.all().delete()
959

    
960

    
961

    
962
    @im_settings(IM_MODULES=['local', 'shibboleth'])
963
    def test_policies(self):
964
        group_old, created = Group.objects.get_or_create(name='olduser')
965

    
966
        astakos_settings.MODERATION_ENABLED = True
967
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
968
            ['academic-user']
969
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
970
            ['google-user']
971

    
972
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
973
        user.groups.add(group_old)
974
        user.add_auth_provider('local')
975

    
976
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
977
        user2.add_auth_provider('shibboleth', identifier='shibid')
978

    
979
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
980
        user3.groups.add(group_old)
981
        user3.add_auth_provider('local')
982
        user3.add_auth_provider('shibboleth', identifier='1234')
983

    
984
        self.assertTrue(user2.groups.get(name='academic-user'))
985
        self.assertFalse(user2.groups.filter(name='olduser').count())
986

    
987
        local = auth_providers.get_provider('local')
988
        self.assertTrue(local.get_add_policy)
989

    
990
        academic_group = Group.objects.get(name='academic-user')
991
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
992
                                                     academic_group,
993
                                                     exclusive=True,
994
                                                     add=False,
995
                                                     login=False)
996
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
997
                                                     academic_group,
998
                                                     exclusive=True,
999
                                                     login=False,
1000
                                                     add=False)
1001
        # no duplicate entry gets created
1002
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1003

    
1004
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1005
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1006
                                                     user2,
1007
                                                     remove=False)
1008
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1009

    
1010
        local = auth_providers.get_provider('local', user2)
1011
        google = auth_providers.get_provider('google', user2)
1012
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1013
        self.assertTrue(shibboleth.get_login_policy)
1014
        self.assertFalse(shibboleth.get_remove_policy)
1015
        self.assertFalse(local.get_add_policy)
1016
        self.assertFalse(local.get_add_policy)
1017
        self.assertFalse(google.get_add_policy)
1018

    
1019
        user2.groups.remove(Group.objects.get(name='academic-user'))
1020
        self.assertTrue(local.get_add_policy)
1021
        self.assertTrue(google.get_add_policy)
1022
        user2.groups.add(Group.objects.get(name='academic-user'))
1023

    
1024
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1025
                                                     user2,
1026
                                                     exclusive=True,
1027
                                                     add=True)
1028
        self.assertTrue(local.get_add_policy)
1029
        self.assertTrue(google.get_add_policy)
1030

    
1031
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1032
        self.assertFalse(local.get_automoderate_policy)
1033
        self.assertFalse(google.get_automoderate_policy)
1034
        self.assertTrue(shibboleth.get_automoderate_policy)
1035

    
1036
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1037
                  'GOOGLE_ADD_GROUPS_POLICY']:
1038
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1039

    
1040

    
1041
    @shibboleth_settings(CREATE_POLICY=True)
1042
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1043
    def test_create_http(self):
1044
        # this should be wrapped inside a transaction
1045
        user = AstakosUser(email="test@test.com")
1046
        user.save()
1047
        provider = auth_providers.get_provider('shibboleth', user,
1048
                                               'test@academia.test')
1049
        provider.add_to_user()
1050
        user.get_auth_provider('shibboleth', 'test@academia.test')
1051
        provider = auth_providers.get_provider('local', user)
1052
        provider.add_to_user()
1053
        user.get_auth_provider('local')
1054

    
1055
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1056
        user = AstakosUser(email="test2@test.com")
1057
        user.save()
1058
        provider = auth_providers.get_provider('shibboleth', user,
1059
                                               'test@shibboleth.com',
1060
                                               **{'info': {'name':
1061
                                                                'User Test'}})
1062
        self.assertFalse(provider.get_create_policy)
1063
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1064
        self.assertTrue(provider.get_create_policy)
1065
        academic = provider.add_to_user()
1066

    
1067
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1068
    @shibboleth_settings(LIMIT_POLICY=2)
1069
    def test_policies(self):
1070
        user = get_local_user('kpap@synnefo.org')
1071
        user.add_auth_provider('shibboleth', identifier='1234')
1072
        user.add_auth_provider('shibboleth', identifier='12345')
1073

    
1074
        # default limit is 1
1075
        local = user.get_auth_provider('local')
1076
        self.assertEqual(local.get_add_policy, False)
1077

    
1078
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1079
        academic = user.get_auth_provider('shibboleth',
1080
                                          identifier='1234')
1081
        self.assertEqual(academic.get_add_policy, False)
1082
        newacademic = auth_providers.get_provider('shibboleth', user,
1083
                                                  identifier='123456')
1084
        self.assertEqual(newacademic.get_add_policy, True)
1085
        user.add_auth_provider('shibboleth', identifier='123456')
1086
        self.assertEqual(academic.get_add_policy, False)
1087
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1088

    
1089
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1090
    @shibboleth_settings(LIMIT_POLICY=2)
1091
    def test_messages(self):
1092
        user = get_local_user('kpap@synnefo.org')
1093
        user.add_auth_provider('shibboleth', identifier='1234')
1094
        user.add_auth_provider('shibboleth', identifier='12345')
1095
        provider = auth_providers.get_provider('shibboleth')
1096
        self.assertEqual(provider.get_message('title'), 'Academic')
1097
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1098
        # regenerate messages cache
1099
        provider = auth_providers.get_provider('shibboleth')
1100
        self.assertEqual(provider.get_message('title'), 'New title')
1101
        self.assertEqual(provider.get_message('login_title'),
1102
                         'New title LOGIN')
1103
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1104
        self.assertEqual(provider.get_module_icon,
1105
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1106
        self.assertEqual(provider.get_module_medium_icon,
1107
                         settings.MEDIA_URL +
1108
                         'im/auth/icons-medium/shibboleth.png')
1109

    
1110
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1111
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1112
        self.assertEqual(provider.get_method_details_msg,
1113
                         'Account: 12345')
1114
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1115
        self.assertEqual(provider.get_method_details_msg,
1116
                         'Account: 1234')
1117

    
1118
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1119
        self.assertEqual(provider.get_not_active_msg,
1120
                         "'Academic login' is disabled.")
1121

    
1122
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1123
    @shibboleth_settings(LIMIT_POLICY=2)
1124
    def test_templates(self):
1125
        user = get_local_user('kpap@synnefo.org')
1126
        user.add_auth_provider('shibboleth', identifier='1234')
1127
        user.add_auth_provider('shibboleth', identifier='12345')
1128

    
1129
        provider = auth_providers.get_provider('shibboleth')
1130
        self.assertEqual(provider.get_template('login'),
1131
                         'im/auth/shibboleth_login.html')
1132
        provider = auth_providers.get_provider('google')
1133
        self.assertEqual(provider.get_template('login'),
1134
                         'im/auth/generic_login.html')
1135

    
1136

    
1137
class TestActivationBackend(TestCase):
1138

    
1139
    def setUp(self):
1140
        # dummy call to pass through logging middleware
1141
        self.client.get(ui_url(''))
1142

    
1143
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1144
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1145
    def test_policies(self):
1146
        backend = activation_backends.get_backend()
1147

    
1148
        # email matches RE_USER_EMAIL_PATTERNS
1149
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1150
                               is_active=False, email_verified=False)
1151
        backend.handle_verification(user1, user1.verification_code)
1152
        self.assertEqual(user1.accepted_policy, 'email')
1153

    
1154
        # manually moderated
1155
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1156
                               is_active=False, email_verified=False)
1157

    
1158
        backend.handle_verification(user2, user2.verification_code)
1159
        self.assertEqual(user2.moderated, False)
1160
        backend.handle_moderation(user2)
1161
        self.assertEqual(user2.moderated, True)
1162
        self.assertEqual(user2.accepted_policy, 'manual')
1163

    
1164
        # autoaccept due to provider automoderate policy
1165
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1166
                               is_active=False, email_verified=False)
1167
        user3.auth_providers.all().delete()
1168
        user3.add_auth_provider('shibboleth', identifier='shib123')
1169
        backend.handle_verification(user3, user3.verification_code)
1170
        self.assertEqual(user3.moderated, True)
1171
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1172

    
1173
    @im_settings(MODERATION_ENABLED=False,
1174
                 MANAGERS=(('Manager',
1175
                            'manager@synnefo.org'),),
1176
                 HELPDESK=(('Helpdesk',
1177
                            'helpdesk@synnefo.org'),),
1178
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1179
    def test_without_moderation(self):
1180
        backend = activation_backends.get_backend()
1181
        form = backend.get_signup_form('local')
1182
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1183

    
1184
        user_data = {
1185
            'email': 'kpap@synnefo.org',
1186
            'first_name': 'Kostas Papas',
1187
            'password1': '123',
1188
            'password2': '123'
1189
        }
1190
        form = backend.get_signup_form('local', user_data)
1191
        user = form.save(commit=False)
1192
        form.store_user(user)
1193
        self.assertEqual(user.is_active, False)
1194
        self.assertEqual(user.email_verified, False)
1195

    
1196
        # step one, registration
1197
        result = backend.handle_registration(user)
1198
        user = AstakosUser.objects.get()
1199
        self.assertEqual(user.is_active, False)
1200
        self.assertEqual(user.email_verified, False)
1201
        self.assertTrue(user.verification_code)
1202
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1203
        backend.send_result_notifications(result, user)
1204
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1205
        self.assertEqual(len(mail.outbox), 1)
1206

    
1207
        # step two, verify email (automatically
1208
        # moderates/accepts user, since moderation is disabled)
1209
        user = AstakosUser.objects.get()
1210
        valid_code = user.verification_code
1211

    
1212
        # test invalid code
1213
        result = backend.handle_verification(user, valid_code)
1214
        backend.send_result_notifications(result, user)
1215
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1216
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1217
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1218
        # verification + activated + greeting = 3
1219
        self.assertEqual(len(mail.outbox), 3)
1220
        user = AstakosUser.objects.get()
1221
        self.assertEqual(user.is_active, True)
1222
        self.assertEqual(user.moderated, True)
1223
        self.assertTrue(user.moderated_at)
1224
        self.assertEqual(user.email_verified, True)
1225
        self.assertTrue(user.activation_sent)
1226

    
1227
    @im_settings(MODERATION_ENABLED=True,
1228
                 MANAGERS=(('Manager',
1229
                            'manager@synnefo.org'),),
1230
                 HELPDESK=(('Helpdesk',
1231
                            'helpdesk@synnefo.org'),),
1232
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1233
    def test_with_moderation(self):
1234

    
1235
        backend = activation_backends.get_backend()
1236
        form = backend.get_signup_form('local')
1237
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1238

    
1239
        user_data = {
1240
            'email': 'kpap@synnefo.org',
1241
            'first_name': 'Kostas Papas',
1242
            'password1': '123',
1243
            'password2': '123'
1244
        }
1245
        form = backend.get_signup_form(provider='local',
1246
                                       initial_data=user_data)
1247
        user = form.save(commit=False)
1248
        form.store_user(user)
1249
        self.assertEqual(user.is_active, False)
1250
        self.assertEqual(user.email_verified, False)
1251

    
1252
        # step one, registration
1253
        result = backend.handle_registration(user)
1254
        user = AstakosUser.objects.get()
1255
        self.assertEqual(user.is_active, False)
1256
        self.assertEqual(user.email_verified, False)
1257
        self.assertTrue(user.verification_code)
1258
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1259
        backend.send_result_notifications(result, user)
1260
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1261
        self.assertEqual(len(mail.outbox), 1)
1262

    
1263
        # step two, verifying email
1264
        user = AstakosUser.objects.get()
1265
        valid_code = user.verification_code
1266
        invalid_code = user.verification_code + 'invalid'
1267

    
1268
        # test invalid code
1269
        result = backend.handle_verification(user, invalid_code)
1270
        self.assertEqual(result.status, backend.Result.ERROR)
1271
        backend.send_result_notifications(result, user)
1272
        user = AstakosUser.objects.get()
1273
        self.assertEqual(user.is_active, False)
1274
        self.assertEqual(user.moderated, False)
1275
        self.assertEqual(user.moderated_at, None)
1276
        self.assertEqual(user.email_verified, False)
1277
        self.assertTrue(user.activation_sent)
1278

    
1279
        # test valid code
1280
        user = AstakosUser.objects.get()
1281
        result = backend.handle_verification(user, valid_code)
1282
        backend.send_result_notifications(result, user)
1283
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1284
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1285
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1286
        self.assertEqual(len(mail.outbox), 2)
1287
        user = AstakosUser.objects.get()
1288
        self.assertEqual(user.moderated, False)
1289
        self.assertEqual(user.moderated_at, None)
1290
        self.assertEqual(user.email_verified, True)
1291
        self.assertTrue(user.activation_sent)
1292

    
1293
        # test code reuse
1294
        result = backend.handle_verification(user, valid_code)
1295
        self.assertEqual(result.status, backend.Result.ERROR)
1296
        user = AstakosUser.objects.get()
1297
        self.assertEqual(user.is_active, False)
1298
        self.assertEqual(user.moderated, False)
1299
        self.assertEqual(user.moderated_at, None)
1300
        self.assertEqual(user.email_verified, True)
1301
        self.assertTrue(user.activation_sent)
1302

    
1303
        # valid code on verified user
1304
        user = AstakosUser.objects.get()
1305
        valid_code = user.verification_code
1306
        result = backend.handle_verification(user, valid_code)
1307
        self.assertEqual(result.status, backend.Result.ERROR)
1308

    
1309
        # step three, moderation user
1310
        user = AstakosUser.objects.get()
1311
        result = backend.handle_moderation(user)
1312
        backend.send_result_notifications(result, user)
1313

    
1314
        user = AstakosUser.objects.get()
1315
        self.assertEqual(user.is_active, True)
1316
        self.assertEqual(user.moderated, True)
1317
        self.assertTrue(user.moderated_at)
1318
        self.assertEqual(user.email_verified, True)
1319
        self.assertTrue(user.activation_sent)
1320

    
1321

    
1322
class TestWebloginRedirect(TestCase):
1323

    
1324
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1325
    def test_restricts_domains(self):
1326
        get_local_user('user1@synnefo.org')
1327

    
1328
        # next url construct helpers
1329
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1330
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1331
            urllib.quote_plus(nxt)
1332

    
1333
        # common cases
1334
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1335
        invalid_scheme = weblogin("customscheme://localhost")
1336
        invalid_scheme_with_valid_domain = \
1337
                weblogin("http://www.invaliddomain.com")
1338
        valid_scheme = weblogin("pithos://localhost/")
1339
        # to be used in assertRedirects
1340
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1341

    
1342
        # not authenticated, redirects to login which contains next param with
1343
        # additional nested quoted next params
1344
        r = self.client.get(valid_scheme, follow=True)
1345
        login_redirect = reverse('login') + '?next=' + \
1346
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1347
        self.assertRedirects(r, login_redirect)
1348

    
1349
        # authenticate client
1350
        self.client.login(username="user1@synnefo.org", password="password")
1351

    
1352
        # valid scheme
1353
        r = self.client.get(valid_scheme, follow=True)
1354
        url = r.redirect_chain[1][0]
1355
        # scheme preserved
1356
        self.assertTrue(url.startswith('pithos://localhost/'))
1357
        # redirect contains token param
1358
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1359
                                   scheme='https').query
1360
        params = urlparse.parse_qs(params)
1361
        self.assertEqual(params['token'][0],
1362
                         AstakosUser.objects.get().auth_token)
1363
        # does not contain uuid
1364
        # reverted for 0.14.2 to support old pithos desktop clients
1365
        #self.assertFalse('uuid' in params)
1366

    
1367
        # invalid cases
1368
        r = self.client.get(invalid_scheme, follow=True)
1369
        self.assertEqual(r.status_code, 403)
1370

    
1371
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1372
        self.assertEqual(r.status_code, 403)
1373

    
1374
        r = self.client.get(invalid_domain, follow=True)
1375
        self.assertEqual(r.status_code, 403)