Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ e585664e

History | View | Annotate | Download (56.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.im.tests.common import *
35

    
36

    
37
class ShibbolethTests(TestCase):
38
    """
39
    Testing shibboleth authentication.
40
    """
41

    
42
    fixtures = ['groups']
43

    
44
    def setUp(self):
45
        self.client = ShibbolethClient()
46
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
47
        astakos_settings.MODERATION_ENABLED = True
48

    
49
    @im_settings(FORCE_PROFILE_UPDATE=False)
50
    def test_create_account(self):
51

    
52
        client = ShibbolethClient()
53

    
54
        # shibboleth views validation
55
        # eepn required
56
        r = client.get('/im/login/shibboleth?', follow=True)
57
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
58
            'domain': astakos_settings.BASEURL,
59
            'contact_email': settings.CONTACT_EMAIL
60
        })
61
        client.set_tokens(eppn="kpapeppn")
62

    
63
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
64
        # shibboleth user info required
65
        r = client.get('/im/login/shibboleth?', follow=True)
66
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
67
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
68

    
69
        # shibboleth logged us in
70
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
71
                          cn="Kostas Papadimitriou",
72
                          ep_affiliation="Test Affiliation")
73
        r = client.get('/im/login/shibboleth?', follow=True)
74
        token = PendingThirdPartyUser.objects.get().token
75
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
76
        self.assertEqual(r.status_code, 200)
77

    
78
        # a new pending user created
79
        pending_user = PendingThirdPartyUser.objects.get(
80
            third_party_identifier="kpapeppn")
81
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
82
        # keep the token for future use
83
        token = pending_user.token
84
        # from now on no shibboleth headers are sent to the server
85
        client.reset_tokens()
86

    
87
        # this is the old way, it should fail, to avoid pending user take over
88
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
89
        self.assertEqual(r.status_code, 404)
90

    
91
        # this is the signup unique url associated with the pending user
92
        # created
93
        r = client.get('/im/signup/?third_party_token=%s' % token)
94
        identifier = pending_user.third_party_identifier
95
        post_data = {'third_party_identifier': identifier,
96
                     'first_name': 'Kostas',
97
                     'third_party_token': token,
98
                     'last_name': 'Mitroglou',
99
                     'provider': 'shibboleth'}
100

    
101
        signup_url = reverse('signup')
102

    
103
        # invlid email
104
        post_data['email'] = 'kpap'
105
        r = client.post(signup_url, post_data)
106
        self.assertContains(r, token)
107

    
108
        # existing email
109
        existing_user = get_local_user('test@test.com')
110
        post_data['email'] = 'test@test.com'
111
        r = client.post(signup_url, post_data)
112
        self.assertContains(r, messages.EMAIL_USED)
113
        existing_user.delete()
114

    
115
        # and finally a valid signup
116
        post_data['email'] = 'kpap@synnefo.org'
117
        r = client.post(signup_url, post_data, follow=True)
118
        self.assertContains(r, messages.VERIFICATION_SENT)
119

    
120
        # entires commited as expected
121
        self.assertEqual(AstakosUser.objects.count(), 1)
122
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
123
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
124

    
125
        # provider info stored
126
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
127
        self.assertEqual(provider.affiliation, 'Test Affiliation')
128
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
129
                                         u'eppn': u'kpapeppn',
130
                                         u'name': u'Kostas Papadimitriou'})
131

    
132
        # login (not activated yet)
133
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
134
                          cn="Kostas Papadimitriou", )
135
        r = client.get("/im/login/shibboleth?", follow=True)
136
        self.assertContains(r, 'is pending moderation')
137

    
138
        # admin activates the user
139
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
140
        backend = activation_backends.get_backend()
141
        activation_result = backend.verify_user(u, u.verification_code)
142
        activation_result = backend.accept_user(u)
143
        self.assertFalse(activation_result.is_error())
144
        backend.send_result_notifications(activation_result, u)
145
        self.assertEqual(u.is_active, True)
146

    
147
        # we see our profile
148
        r = client.get("/im/login/shibboleth?", follow=True)
149
        self.assertRedirects(r, '/im/landing')
150
        self.assertEqual(r.status_code, 200)
151

    
152
    def test_existing(self):
153
        """
154
        Test adding of third party login to an existing account
155
        """
156

    
157
        # this is our existing user
158
        existing_user = get_local_user('kpap@synnefo.org')
159
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
160
        existing_inactive.is_active = False
161
        existing_inactive.save()
162

    
163
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
164
        existing_unverified.is_active = False
165
        existing_unverified.activation_sent = None
166
        existing_unverified.email_verified = False
167
        existing_unverified.is_verified = False
168
        existing_unverified.save()
169

    
170
        client = ShibbolethClient()
171
        # shibboleth logged us in, notice that we use different email
172
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
173
                          cn="Kostas Papadimitriou", )
174
        r = client.get("/im/login/shibboleth?", follow=True)
175

    
176
        # a new pending user created
177
        pending_user = PendingThirdPartyUser.objects.get()
178
        token = pending_user.token
179
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
180
        pending_key = pending_user.token
181
        client.reset_tokens()
182
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
183

    
184
        form = r.context['login_form']
185
        signupdata = copy.copy(form.initial)
186
        signupdata['email'] = 'kpap@synnefo.org'
187
        signupdata['third_party_token'] = token
188
        signupdata['provider'] = 'shibboleth'
189
        signupdata.pop('id', None)
190

    
191
        # the email exists to another user
192
        r = client.post("/im/signup", signupdata)
193
        self.assertContains(r, "There is already an account with this email "
194
                               "address")
195
        # change the case, still cannot create
196
        signupdata['email'] = 'KPAP@synnefo.org'
197
        r = client.post("/im/signup", signupdata)
198
        self.assertContains(r, "There is already an account with this email "
199
                               "address")
200
        # inactive user
201
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
202
        r = client.post("/im/signup", signupdata)
203
        self.assertContains(r, "There is already an account with this email "
204
                               "address")
205

    
206
        # unverified user, this should pass, old entry will be deleted
207
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
208
        r = client.post("/im/signup", signupdata)
209

    
210
        post_data = {'password': 'password',
211
                     'username': 'kpap@synnefo.org'}
212
        r = client.post('/im/local', post_data, follow=True)
213
        self.assertTrue(r.context['request'].user.is_authenticated())
214
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
215
                          cn="Kostas Papadimitriou", )
216
        r = client.get("/im/login/shibboleth?", follow=True)
217
        self.assertContains(r, "enabled for this account")
218
        client.reset_tokens()
219

    
220
        user = existing_user
221
        self.assertTrue(user.has_auth_provider('shibboleth'))
222
        self.assertTrue(user.has_auth_provider('local',
223
                                               auth_backend='astakos'))
224
        client.logout()
225

    
226
        # look Ma, i can login with both my shibboleth and local account
227
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
228
                          cn="Kostas Papadimitriou")
229
        r = client.get("/im/login/shibboleth?", follow=True)
230
        self.assertTrue(r.context['request'].user.is_authenticated())
231
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
232
        self.assertRedirects(r, '/im/landing')
233
        self.assertEqual(r.status_code, 200)
234
        client.logout()
235
        client.reset_tokens()
236

    
237
        # logged out
238
        r = client.get("/im/profile", follow=True)
239
        self.assertFalse(r.context['request'].user.is_authenticated())
240

    
241
        # login with local account also works
242
        post_data = {'password': 'password',
243
                     'username': 'kpap@synnefo.org'}
244
        r = self.client.post('/im/local', post_data, follow=True)
245
        self.assertTrue(r.context['request'].user.is_authenticated())
246
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
247
        self.assertRedirects(r, '/im/landing')
248
        self.assertEqual(r.status_code, 200)
249

    
250
        # cannot add the same eppn
251
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
252
                          cn="Kostas Papadimitriou", )
253
        r = client.get("/im/login/shibboleth?", follow=True)
254
        self.assertRedirects(r, '/im/landing')
255
        self.assertTrue(r.status_code, 200)
256
        self.assertEquals(existing_user.auth_providers.count(), 2)
257

    
258
        # only one allowed by default
259
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
260
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
261
        prov = auth_providers.get_provider('shibboleth')
262
        r = client.get("/im/login/shibboleth?", follow=True)
263
        self.assertContains(r, "Failed to add")
264
        self.assertRedirects(r, '/im/profile')
265
        self.assertTrue(r.status_code, 200)
266
        self.assertEquals(existing_user.auth_providers.count(), 2)
267
        client.logout()
268
        client.reset_tokens()
269

    
270
        # cannot login with another eppn
271
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
272
                          cn="Kostas Papadimitriou")
273
        r = client.get("/im/login/shibboleth?", follow=True)
274
        self.assertFalse(r.context['request'].user.is_authenticated())
275

    
276
        # cannot
277

    
278
        # lets remove local password
279
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
280
                                       email="kpap@synnefo.org")
281
        remove_local_url = user.get_auth_provider('local').get_remove_url
282
        remove_shibbo_url = user.get_auth_provider('shibboleth',
283
                                                   'kpapeppn').get_remove_url
284
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
285
                          cn="Kostas Papadimtriou")
286
        r = client.get("/im/login/shibboleth?", follow=True)
287
        client.reset_tokens()
288

    
289
        # TODO: this view should use POST
290
        r = client.get(remove_local_url)
291
        # 2 providers left
292
        self.assertEqual(user.auth_providers.count(), 1)
293
        # cannot remove last provider
294
        r = client.get(remove_shibbo_url)
295
        self.assertEqual(r.status_code, 403)
296
        self.client.logout()
297

    
298
        # cannot login using local credentials (notice we use another client)
299
        post_data = {'password': 'password',
300
                     'username': 'kpap@synnefo.org'}
301
        r = self.client.post('/im/local', post_data, follow=True)
302
        self.assertFalse(r.context['request'].user.is_authenticated())
303

    
304
        # we can reenable the local provider by setting a password
305
        r = client.get("/im/password_change", follow=True)
306
        r = client.post("/im/password_change", {'new_password1': '111',
307
                                                'new_password2': '111'},
308
                        follow=True)
309
        user = r.context['request'].user
310
        self.assertTrue(user.has_auth_provider('local'))
311
        self.assertTrue(user.has_auth_provider('shibboleth'))
312
        self.assertTrue(user.check_password('111'))
313
        self.assertTrue(user.has_usable_password())
314
        self.client.logout()
315

    
316
        # now we can login
317
        post_data = {'password': '111',
318
                     'username': 'kpap@synnefo.org'}
319
        r = self.client.post('/im/local', post_data, follow=True)
320
        self.assertTrue(r.context['request'].user.is_authenticated())
321

    
322
        client.reset_tokens()
323

    
324
        # we cannot take over another shibboleth identifier
325
        user2 = get_local_user('another@synnefo.org')
326
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
327
        # login
328
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
329
                          cn="Kostas Papadimitriou")
330
        r = client.get("/im/login/shibboleth?", follow=True)
331
        # try to assign existing shibboleth identifier of another user
332
        client.set_tokens(mail="kpap_second@shibboleth.gr",
333
                          eppn="existingeppn", cn="Kostas Papadimitriou")
334
        r = client.get("/im/login/shibboleth?", follow=True)
335
        self.assertContains(r, "this account is already assigned")
336

    
337

    
338
class TestLocal(TestCase):
339

    
340
    fixtures = ['groups']
341

    
342
    def setUp(self):
343
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
344
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
345
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
346
        settings.ASTAKOS_MODERATION_ENABLED = True
347

    
348
    def tearDown(self):
349
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
350

    
351
    def test_no_moderation(self):
352
        # disable moderation
353
        astakos_settings.MODERATION_ENABLED = False
354

    
355
        # create a new user
356
        r = self.client.get("/im/signup")
357
        self.assertEqual(r.status_code, 200)
358
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
359
                'password2': 'password', 'first_name': 'Kostas',
360
                'last_name': 'Mitroglou', 'provider': 'local'}
361
        r = self.client.post("/im/signup", data)
362

    
363
        # user created
364
        self.assertEqual(AstakosUser.objects.count(), 1)
365
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
366
                                       email="kpap@synnefo.org")
367
        self.assertEqual(user.username, 'kpap@synnefo.org')
368
        self.assertEqual(user.has_auth_provider('local'), True)
369
        self.assertFalse(user.is_active)
370

    
371
        # user (but not admin) gets notified
372
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
373
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
374
        astakos_settings.MODERATION_ENABLED = True
375

    
376
    def test_email_case(self):
377
        data = {
378
            'email': 'kPap@synnefo.org',
379
            'password1': '1234',
380
            'password2': '1234'
381
        }
382

    
383
        form = forms.LocalUserCreationForm(data)
384
        self.assertTrue(form.is_valid())
385
        user = form.save()
386
        form.store_user(user, {})
387

    
388
        u = AstakosUser.objects.get()
389
        self.assertEqual(u.email, 'kPap@synnefo.org')
390
        self.assertEqual(u.username, 'kpap@synnefo.org')
391
        u.is_active = True
392
        u.email_verified = True
393
        u.save()
394

    
395
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
396
        login = forms.LoginForm(data=data)
397
        self.assertTrue(login.is_valid())
398

    
399
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
400
        login = forms.LoginForm(data=data)
401
        self.assertTrue(login.is_valid())
402

    
403
        data = {
404
            'email': 'kpap@synnefo.org',
405
            'password1': '1234',
406
            'password2': '1234'
407
        }
408
        form = forms.LocalUserCreationForm(data)
409
        self.assertFalse(form.is_valid())
410

    
411
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
412
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
413
    def test_local_provider(self):
414
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
415

    
416
        # create a user
417
        r = self.client.get("/im/signup")
418
        self.assertEqual(r.status_code, 200)
419
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
420
                'password2': 'password', 'first_name': 'Kostas',
421
                'last_name': 'Mitroglou', 'provider': 'local'}
422
        r = self.client.post("/im/signup", data)
423

    
424
        # user created
425
        self.assertEqual(AstakosUser.objects.count(), 1)
426
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
427
                                       email="kpap@synnefo.org")
428
        self.assertEqual(user.username, 'kpap@synnefo.org')
429
        self.assertEqual(user.has_auth_provider('local'), True)
430
        self.assertFalse(user.is_active)  # not activated
431
        self.assertFalse(user.email_verified)  # not verified
432
        self.assertTrue(user.activation_sent)  # activation automatically sent
433
        self.assertFalse(user.moderated)
434
        self.assertFalse(user.email_verified)
435

    
436
        # admin gets notified and activates the user from the command line
437
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
438
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
439
                                           'password': 'password'},
440
                             follow=True)
441
        self.assertContains(r, messages.VERIFICATION_SENT)
442
        backend = activation_backends.get_backend()
443

    
444
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
445
        backend.send_user_verification_email(user)
446

    
447
        # user activation fields updated and user gets notified via email
448
        user = AstakosUser.objects.get(pk=user.pk)
449
        self.assertTrue(user.activation_sent)
450
        self.assertFalse(user.email_verified)
451
        self.assertFalse(user.is_active)
452
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
453

    
454
        # user forgot she got registered and tries to submit registration
455
        # form. Notice the upper case in email
456
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
457
                'password2': 'password', 'first_name': 'Kostas',
458
                'last_name': 'Mitroglou', 'provider': 'local'}
459
        r = self.client.post("/im/signup", data, follow=True)
460
        self.assertRedirects(r, reverse('index'))
461
        self.assertContains(r, messages.VERIFICATION_SENT)
462

    
463
        user = AstakosUser.objects.get()
464
        # previous user replaced
465
        self.assertTrue(user.activation_sent)
466
        self.assertFalse(user.email_verified)
467
        self.assertFalse(user.is_active)
468
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
469

    
470
        # hmmm, email exists; lets request a password change
471
        r = self.client.get('/im/local/password_reset')
472
        self.assertEqual(r.status_code, 200)
473
        data = {'email': 'kpap@synnefo.org'}
474
        r = self.client.post('/im/local/password_reset', data, follow=True)
475
        # she can't because account is not active yet
476
        self.assertContains(r, 'pending activation')
477

    
478
        # moderation is enabled and an activation email has already been sent
479
        # so user can trigger resend of the activation email
480
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
481
        self.assertContains(r, 'has been sent to your email address.')
482
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
483

    
484
        # also she cannot login
485
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
486
        r = self.client.post('/im/local', data, follow=True)
487
        self.assertContains(r, 'Resend activation')
488
        self.assertFalse(r.context['request'].user.is_authenticated())
489
        self.assertFalse('_pithos2_a' in self.client.cookies)
490

    
491
        # user sees the message and resends activation
492
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
493
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
494

    
495
        # logged in user cannot activate another account
496
        tmp_user = get_local_user("test_existing_user@synnefo.org")
497
        tmp_client = Client()
498
        tmp_client.login(username="test_existing_user@synnefo.org",
499
                         password="password")
500
        r = tmp_client.get(user.get_activation_url(), follow=True)
501
        self.assertContains(r, messages.LOGGED_IN_WARNING)
502

    
503
        r = self.client.get(user.get_activation_url(), follow=True)
504
        # previous code got invalidated
505
        self.assertEqual(r.status_code, 404)
506

    
507
        user = AstakosUser.objects.get(pk=user.pk)
508
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
509
        r = self.client.get(user.get_activation_url(), follow=True)
510
        self.assertRedirects(r, reverse('index'))
511
        # user sees that account is pending approval from admins
512
        self.assertContains(r, messages.NOTIFICATION_SENT)
513
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
514

    
515
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
516
        result = backend.handle_moderation(user)
517
        backend.send_result_notifications(result, user)
518
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
519
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
520

    
521
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
522
        r = self.client.get('/im/profile', follow=True)
523
        self.assertFalse(r.context['request'].user.is_authenticated())
524
        self.assertFalse('_pithos2_a' in self.client.cookies)
525
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
526

    
527
        user = AstakosUser.objects.get(pk=user.pk)
528
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
529
                                           'password': 'password'},
530
                             follow=True)
531
        # user activated and logged in, token cookie set
532
        self.assertTrue(r.context['request'].user.is_authenticated())
533
        self.assertTrue('_pithos2_a' in self.client.cookies)
534
        cookies = self.client.cookies
535
        self.assertTrue(quote(user.auth_token) in
536
                        cookies.get('_pithos2_a').value)
537
        r = self.client.get('/im/logout', follow=True)
538
        r = self.client.get('/im/')
539
        # user logged out, token cookie removed
540
        self.assertFalse(r.context['request'].user.is_authenticated())
541
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
542

    
543
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
544
        del self.client.cookies['_pithos2_a']
545

    
546
        # user can login
547
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
548
                                           'password': 'password'},
549
                             follow=True)
550
        self.assertTrue(r.context['request'].user.is_authenticated())
551
        self.assertTrue('_pithos2_a' in self.client.cookies)
552
        cookies = self.client.cookies
553
        self.assertTrue(quote(user.auth_token) in
554
                        cookies.get('_pithos2_a').value)
555
        self.client.get('/im/logout', follow=True)
556

    
557
        # user forgot password
558
        old_pass = user.password
559
        r = self.client.get('/im/local/password_reset')
560
        self.assertEqual(r.status_code, 200)
561
        r = self.client.post('/im/local/password_reset', {'email':
562
                                                          'kpap@synnefo.org'})
563
        self.assertEqual(r.status_code, 302)
564
        # email sent
565
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
566

    
567
        # user visits change password link
568
        user = AstakosUser.objects.get(pk=user.pk)
569
        r = self.client.get(user.get_password_reset_url())
570
        r = self.client.post(user.get_password_reset_url(),
571
                             {'new_password1': 'newpass',
572
                              'new_password2': 'newpass'})
573

    
574
        user = AstakosUser.objects.get(pk=user.pk)
575
        self.assertNotEqual(old_pass, user.password)
576

    
577
        # old pass is not usable
578
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
579
                                           'password': 'password'})
580
        self.assertContains(r, 'Please enter a correct username and password')
581
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
582
                                           'password': 'newpass'},
583
                             follow=True)
584
        self.assertTrue(r.context['request'].user.is_authenticated())
585
        self.client.logout()
586

    
587
        # tests of special local backends
588
        user = AstakosUser.objects.get(pk=user.pk)
589
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
590
        user.save()
591

    
592
        # non astakos local backends do not support password reset
593
        r = self.client.get('/im/local/password_reset')
594
        self.assertEqual(r.status_code, 200)
595
        r = self.client.post('/im/local/password_reset', {'email':
596
                                                          'kpap@synnefo.org'})
597
        # she can't because account is not active yet
598
        self.assertContains(r, "Changing password is not")
599

    
600

    
601
class UserActionsTests(TestCase):
602

    
603
    def test_email_change(self):
604
        # to test existing email validation
605
        get_local_user('existing@synnefo.org')
606

    
607
        # local user
608
        user = get_local_user('kpap@synnefo.org')
609

    
610
        # login as kpap
611
        self.client.login(username='kpap@synnefo.org', password='password')
612
        r = self.client.get('/im/profile', follow=True)
613
        user = r.context['request'].user
614
        self.assertTrue(user.is_authenticated())
615

    
616
        # change email is enabled
617
        r = self.client.get('/im/email_change')
618
        self.assertEqual(r.status_code, 200)
619
        self.assertFalse(user.email_change_is_pending())
620

    
621
        # request email change to an existing email fails
622
        data = {'new_email_address': 'existing@synnefo.org'}
623
        r = self.client.post('/im/email_change', data)
624
        self.assertContains(r, messages.EMAIL_USED)
625

    
626
        # proper email change
627
        data = {'new_email_address': 'kpap@gmail.com'}
628
        r = self.client.post('/im/email_change', data, follow=True)
629
        self.assertRedirects(r, '/im/profile')
630
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
631
        change1 = EmailChange.objects.get()
632

    
633
        # user sees a warning
634
        r = self.client.get('/im/email_change')
635
        self.assertEqual(r.status_code, 200)
636
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
637
        self.assertTrue(user.email_change_is_pending())
638

    
639
        # link was sent
640
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
641
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
642

    
643
        # proper email change
644
        data = {'new_email_address': 'kpap@yahoo.com'}
645
        r = self.client.post('/im/email_change', data, follow=True)
646
        self.assertRedirects(r, '/im/profile')
647
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
648
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
649
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
650
        change2 = EmailChange.objects.get()
651

    
652
        r = self.client.get(change1.get_url())
653
        self.assertEquals(r.status_code, 302)
654
        self.client.logout()
655

    
656
        r = self.client.post('/im/local?next=' + change2.get_url(),
657
                             {'username': 'kpap@synnefo.org',
658
                              'password': 'password',
659
                              'next': change2.get_url()},
660
                             follow=True)
661
        self.assertRedirects(r, '/im/profile')
662
        user = r.context['request'].user
663
        self.assertEquals(user.email, 'kpap@yahoo.com')
664
        self.assertEquals(user.username, 'kpap@yahoo.com')
665

    
666
        self.client.logout()
667
        r = self.client.post('/im/local?next=' + change2.get_url(),
668
                             {'username': 'kpap@synnefo.org',
669
                              'password': 'password',
670
                              'next': change2.get_url()},
671
                             follow=True)
672
        self.assertContains(r, "Please enter a correct username and password")
673
        self.assertEqual(user.emailchanges.count(), 0)
674

    
675

    
676
class TestAuthProviderViews(TestCase):
677

    
678
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
679
                         AUTOMODERATE_POLICY=True)
680
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
681
                 FORCE_PROFILE_UPDATE=False)
682
    def test_user(self):
683
        Profile = AuthProviderPolicyProfile
684
        Pending = PendingThirdPartyUser
685
        User = AstakosUser
686

    
687
        User.objects.create(email="newuser@synnefo.org")
688
        get_local_user("olduser@synnefo.org")
689
        cl_olduser = ShibbolethClient()
690
        get_local_user("olduser2@synnefo.org")
691
        ShibbolethClient()
692
        cl_newuser = ShibbolethClient()
693
        cl_newuser2 = Client()
694

    
695
        academic_group, created = Group.objects.get_or_create(
696
            name='academic-login')
697
        academic_users = academic_group.user_set
698
        assert created
699
        policy_only_academic = Profile.objects.add_policy('academic_strict',
700
                                                          'shibboleth',
701
                                                          academic_group,
702
                                                          exclusive=True,
703
                                                          login=False,
704
                                                          add=False)
705

    
706
        # new academic user
707
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
708
        cl_newuser.set_tokens(eppn="newusereppn")
709
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
710
        pending = Pending.objects.get()
711
        identifier = pending.third_party_identifier
712
        signup_data = {'third_party_identifier': identifier,
713
                       'first_name': 'Academic',
714
                       'third_party_token': pending.token,
715
                       'last_name': 'New User',
716
                       'provider': 'shibboleth'}
717
        r = cl_newuser.post('/im/signup', signup_data)
718
        self.assertContains(r, "This field is required", )
719
        signup_data['email'] = 'olduser@synnefo.org'
720
        r = cl_newuser.post('/im/signup', signup_data)
721
        self.assertContains(r, "already an account with this email", )
722
        signup_data['email'] = 'newuser@synnefo.org'
723
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
724
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
725
        self.assertEqual(r.status_code, 404)
726
        newuser = User.objects.get(email="newuser@synnefo.org")
727
        activation_link = newuser.get_activation_url()
728
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
729

    
730
        # new non-academic user
731
        signup_data = {'first_name': 'Non Academic',
732
                       'last_name': 'New User',
733
                       'provider': 'local',
734
                       'password1': 'password',
735
                       'password2': 'password'}
736
        signup_data['email'] = 'olduser@synnefo.org'
737
        r = cl_newuser2.post('/im/signup', signup_data)
738
        self.assertContains(r, 'There is already an account with this '
739
                               'email address')
740
        signup_data['email'] = 'newuser@synnefo.org'
741
        r = cl_newuser2.post('/im/signup/', signup_data)
742
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
743
        r = self.client.get(activation_link, follow=True)
744
        self.assertEqual(r.status_code, 404)
745
        newuser = User.objects.get(email="newuser@synnefo.org")
746
        self.assertTrue(newuser.activation_sent)
747

    
748
        # activation sent, user didn't open verification url so additional
749
        # registrations invalidate the previous signups.
750
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
751
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
752
        pending = Pending.objects.get()
753
        identifier = pending.third_party_identifier
754
        signup_data = {'third_party_identifier': identifier,
755
                       'first_name': 'Academic',
756
                       'third_party_token': pending.token,
757
                       'last_name': 'New User',
758
                       'provider': 'shibboleth'}
759
        signup_data['email'] = 'newuser@synnefo.org'
760
        r = cl_newuser.post('/im/signup', signup_data)
761
        self.assertEqual(r.status_code, 302)
762
        newuser = User.objects.get(email="newuser@synnefo.org")
763
        self.assertTrue(newuser.activation_sent)
764
        activation_link = newuser.get_activation_url()
765
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
766
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
767
        self.assertRedirects(r, '/im/landing')
768
        newuser = User.objects.get(email="newuser@synnefo.org")
769
        self.assertEqual(newuser.is_active, True)
770
        self.assertEqual(newuser.email_verified, True)
771
        cl_newuser.logout()
772

    
773
        # cannot reactivate if suspended
774
        newuser.is_active = False
775
        newuser.save()
776
        r = cl_newuser.get(newuser.get_activation_url())
777
        newuser = User.objects.get(email="newuser@synnefo.org")
778
        self.assertFalse(newuser.is_active)
779

    
780
        # release suspension
781
        newuser.is_active = True
782
        newuser.save()
783

    
784
        cl_newuser.get('/im/login/shibboleth?', follow=True)
785
        local = auth.get_provider('local', newuser)
786
        self.assertEqual(local.get_add_policy, False)
787
        self.assertEqual(local.get_login_policy, False)
788
        r = cl_newuser.get(local.get_add_url, follow=True)
789
        self.assertRedirects(r, '/im/profile')
790
        self.assertContains(r, 'disabled for your')
791

    
792
        cl_olduser.login(username='olduser@synnefo.org', password="password")
793
        r = cl_olduser.get('/im/profile', follow=True)
794
        self.assertEqual(r.status_code, 200)
795
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
796
        self.assertContains(r, 'Your request is missing a unique token')
797
        cl_olduser.set_tokens(eppn="newusereppn")
798
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
799
        self.assertContains(r, 'is already assigned to another user')
800
        cl_olduser.set_tokens(eppn="oldusereppn")
801
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
802
        self.assertContains(r, 'Academic login enabled for this account')
803

    
804
        user = User.objects.get(email="olduser@synnefo.org")
805
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
806
        local_provider = user.get_auth_provider('local')
807
        self.assertEqual(shib_provider.get_remove_policy, True)
808
        self.assertEqual(local_provider.get_remove_policy, True)
809

    
810
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
811
                                                          'shibboleth',
812
                                                          academic_group,
813
                                                          remove=False)
814
        user.groups.add(academic_group)
815
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
816
        local_provider = user.get_auth_provider('local')
817
        self.assertEqual(shib_provider.get_remove_policy, False)
818
        self.assertEqual(local_provider.get_remove_policy, True)
819
        self.assertEqual(local_provider.get_login_policy, False)
820

    
821
        cl_olduser.logout()
822
        login_data = {'username': 'olduser@synnefo.org', 'password': 'password'}
823
        r = cl_olduser.post('/im/local', login_data, follow=True)
824
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
825
        Group.objects.all().delete()
826

    
827

    
828
class TestAuthProvidersAPI(TestCase):
829
    """
830
    Test auth_providers module API
831
    """
832

    
833
    @im_settings(IM_MODULES=['local', 'shibboleth'])
834
    def test_create(self):
835
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
836
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
837

    
838
        module = 'shibboleth'
839
        identifier = 'SHIB_UUID'
840
        provider_params = {
841
            'affiliation': 'UNIVERSITY',
842
            'info': {'age': 27}
843
        }
844
        provider = auth.get_provider(module, user2, identifier,
845
                                     **provider_params)
846
        provider.add_to_user()
847
        provider = auth.get_provider(module, user, identifier,
848
                                     **provider_params)
849
        provider.add_to_user()
850
        user.email_verified = True
851
        user.save()
852
        self.assertRaises(Exception, provider.add_to_user)
853
        provider = user.get_auth_provider(module, identifier)
854
        self.assertEqual(user.get_auth_provider(
855
            module, identifier)._instance.info.get('age'), 27)
856

    
857
        module = 'local'
858
        identifier = None
859
        provider_params = {'auth_backend': 'ldap', 'info':
860
                          {'office': 'A1'}}
861
        provider = auth.get_provider(module, user, identifier,
862
                                     **provider_params)
863
        provider.add_to_user()
864
        self.assertFalse(provider.get_add_policy)
865
        self.assertRaises(Exception, provider.add_to_user)
866

    
867
        shib = user.get_auth_provider('shibboleth',
868
                                      'SHIB_UUID')
869
        self.assertTrue(shib.get_remove_policy)
870

    
871
        local = user.get_auth_provider('local')
872
        self.assertTrue(local.get_remove_policy)
873

    
874
        local.remove_from_user()
875
        self.assertFalse(shib.get_remove_policy)
876
        self.assertRaises(Exception, shib.remove_from_user)
877

    
878
        provider = user.get_auth_providers()[0]
879
        self.assertRaises(Exception, provider.add_to_user)
880

    
881
    @im_settings(IM_MODULES=['local', 'shibboleth'])
882
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
883
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
884
                                                 'group2'])
885
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
886
                        CREATION_GROUPS_POLICY=['localgroup-create',
887
                                                'group-create'])
888
    def test_add_groups(self):
889
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
890
        provider = auth.get_provider('shibboleth', user, 'test123')
891
        provider.add_to_user()
892
        user = AstakosUser.objects.get()
893
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
894
                              sorted([u'group1', u'group2', u'group-create']))
895

    
896
        local = auth.get_provider('local', user)
897
        local.add_to_user()
898
        provider = user.get_auth_provider('shibboleth')
899
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
900
        provider.remove_from_user()
901
        user = AstakosUser.objects.get()
902
        self.assertEqual(len(user.get_auth_providers()), 1)
903
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
904
                              sorted([u'group-create', u'localgroup']))
905

    
906
        local = user.get_auth_provider('local')
907
        self.assertRaises(Exception, local.remove_from_user)
908
        provider = auth.get_provider('shibboleth', user, 'test123')
909
        provider.add_to_user()
910
        user = AstakosUser.objects.get()
911
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
912
                              sorted([u'group-create', u'group1', u'group2',
913
                               u'localgroup']))
914
        Group.objects.all().delete()
915

    
916

    
917

    
918
    @im_settings(IM_MODULES=['local', 'shibboleth'])
919
    def test_policies(self):
920
        group_old, created = Group.objects.get_or_create(name='olduser')
921

    
922
        astakos_settings.MODERATION_ENABLED = True
923
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
924
            ['academic-user']
925
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
926
            ['google-user']
927

    
928
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
929
        user.groups.add(group_old)
930
        user.add_auth_provider('local')
931

    
932
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
933
        user2.add_auth_provider('shibboleth', identifier='shibid')
934

    
935
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
936
        user3.groups.add(group_old)
937
        user3.add_auth_provider('local')
938
        user3.add_auth_provider('shibboleth', identifier='1234')
939

    
940
        self.assertTrue(user2.groups.get(name='academic-user'))
941
        self.assertFalse(user2.groups.filter(name='olduser').count())
942

    
943
        local = auth_providers.get_provider('local')
944
        self.assertTrue(local.get_add_policy)
945

    
946
        academic_group = Group.objects.get(name='academic-user')
947
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
948
                                                     academic_group,
949
                                                     exclusive=True,
950
                                                     add=False,
951
                                                     login=False)
952
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
953
                                                     academic_group,
954
                                                     exclusive=True,
955
                                                     login=False,
956
                                                     add=False)
957
        # no duplicate entry gets created
958
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
959

    
960
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
961
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
962
                                                     user2,
963
                                                     remove=False)
964
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
965

    
966
        local = auth_providers.get_provider('local', user2)
967
        google = auth_providers.get_provider('google', user2)
968
        shibboleth = auth_providers.get_provider('shibboleth', user2)
969
        self.assertTrue(shibboleth.get_login_policy)
970
        self.assertFalse(shibboleth.get_remove_policy)
971
        self.assertFalse(local.get_add_policy)
972
        self.assertFalse(local.get_add_policy)
973
        self.assertFalse(google.get_add_policy)
974

    
975
        user2.groups.remove(Group.objects.get(name='academic-user'))
976
        self.assertTrue(local.get_add_policy)
977
        self.assertTrue(google.get_add_policy)
978
        user2.groups.add(Group.objects.get(name='academic-user'))
979

    
980
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
981
                                                     user2,
982
                                                     exclusive=True,
983
                                                     add=True)
984
        self.assertTrue(local.get_add_policy)
985
        self.assertTrue(google.get_add_policy)
986

    
987
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
988
        self.assertFalse(local.get_automoderate_policy)
989
        self.assertFalse(google.get_automoderate_policy)
990
        self.assertTrue(shibboleth.get_automoderate_policy)
991

    
992
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
993
                  'GOOGLE_ADD_GROUPS_POLICY']:
994
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
995

    
996

    
997
    @shibboleth_settings(CREATE_POLICY=True)
998
    @im_settings(IM_MODULES=['local', 'shibboleth'])
999
    def test_create_http(self):
1000
        # this should be wrapped inside a transaction
1001
        user = AstakosUser(email="test@test.com")
1002
        user.save()
1003
        provider = auth_providers.get_provider('shibboleth', user,
1004
                                               'test@academia.test')
1005
        provider.add_to_user()
1006
        user.get_auth_provider('shibboleth', 'test@academia.test')
1007
        provider = auth_providers.get_provider('local', user)
1008
        provider.add_to_user()
1009
        user.get_auth_provider('local')
1010

    
1011
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1012
        user = AstakosUser(email="test2@test.com")
1013
        user.save()
1014
        provider = auth_providers.get_provider('shibboleth', user,
1015
                                               'test@shibboleth.com',
1016
                                               **{'info': {'name':
1017
                                                                'User Test'}})
1018
        self.assertFalse(provider.get_create_policy)
1019
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1020
        self.assertTrue(provider.get_create_policy)
1021
        academic = provider.add_to_user()
1022

    
1023
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1024
    @shibboleth_settings(LIMIT_POLICY=2)
1025
    def test_policies(self):
1026
        user = get_local_user('kpap@synnefo.org')
1027
        user.add_auth_provider('shibboleth', identifier='1234')
1028
        user.add_auth_provider('shibboleth', identifier='12345')
1029

    
1030
        # default limit is 1
1031
        local = user.get_auth_provider('local')
1032
        self.assertEqual(local.get_add_policy, False)
1033

    
1034
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1035
        academic = user.get_auth_provider('shibboleth',
1036
                                          identifier='1234')
1037
        self.assertEqual(academic.get_add_policy, False)
1038
        newacademic = auth_providers.get_provider('shibboleth', user,
1039
                                                  identifier='123456')
1040
        self.assertEqual(newacademic.get_add_policy, True)
1041
        user.add_auth_provider('shibboleth', identifier='123456')
1042
        self.assertEqual(academic.get_add_policy, False)
1043
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1044

    
1045
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1046
    @shibboleth_settings(LIMIT_POLICY=2)
1047
    def test_messages(self):
1048
        user = get_local_user('kpap@synnefo.org')
1049
        user.add_auth_provider('shibboleth', identifier='1234')
1050
        user.add_auth_provider('shibboleth', identifier='12345')
1051
        provider = auth_providers.get_provider('shibboleth')
1052
        self.assertEqual(provider.get_message('title'), 'Academic')
1053
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1054
        # regenerate messages cache
1055
        provider = auth_providers.get_provider('shibboleth')
1056
        self.assertEqual(provider.get_message('title'), 'New title')
1057
        self.assertEqual(provider.get_message('login_title'),
1058
                         'New title LOGIN')
1059
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1060
        self.assertEqual(provider.get_module_icon,
1061
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1062
        self.assertEqual(provider.get_module_medium_icon,
1063
                         settings.MEDIA_URL +
1064
                         'im/auth/icons-medium/shibboleth.png')
1065

    
1066
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1067
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1068
        self.assertEqual(provider.get_method_details_msg,
1069
                         'Account: 12345')
1070
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1071
        self.assertEqual(provider.get_method_details_msg,
1072
                         'Account: 1234')
1073

    
1074
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1075
        self.assertEqual(provider.get_not_active_msg,
1076
                         "'Academic login' is disabled.")
1077

    
1078
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1079
    @shibboleth_settings(LIMIT_POLICY=2)
1080
    def test_templates(self):
1081
        user = get_local_user('kpap@synnefo.org')
1082
        user.add_auth_provider('shibboleth', identifier='1234')
1083
        user.add_auth_provider('shibboleth', identifier='12345')
1084

    
1085
        provider = auth_providers.get_provider('shibboleth')
1086
        self.assertEqual(provider.get_template('login'),
1087
                         'im/auth/shibboleth_login.html')
1088
        provider = auth_providers.get_provider('google')
1089
        self.assertEqual(provider.get_template('login'),
1090
                         'im/auth/generic_login.html')
1091

    
1092

    
1093
class TestActivationBackend(TestCase):
1094

    
1095
    def setUp(self):
1096
        # dummy call to pass through logging middleware
1097
        self.client.get('/im/')
1098

    
1099
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1100
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1101
    def test_policies(self):
1102
        backend = activation_backends.get_backend()
1103

    
1104
        # email matches RE_USER_EMAIL_PATTERNS
1105
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1106
                               is_active=False, email_verified=False)
1107
        backend.handle_verification(user1, user1.verification_code)
1108
        self.assertEqual(user1.accepted_policy, 'email')
1109

    
1110
        # manually moderated
1111
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1112
                               is_active=False, email_verified=False)
1113

    
1114
        backend.handle_verification(user2, user2.verification_code)
1115
        self.assertEqual(user2.moderated, False)
1116
        backend.handle_moderation(user2)
1117
        self.assertEqual(user2.moderated, True)
1118
        self.assertEqual(user2.accepted_policy, 'manual')
1119

    
1120
        # autoaccept due to provider automoderate policy
1121
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1122
                               is_active=False, email_verified=False)
1123
        user3.auth_providers.all().delete()
1124
        user3.add_auth_provider('shibboleth', identifier='shib123')
1125
        backend.handle_verification(user3, user3.verification_code)
1126
        self.assertEqual(user3.moderated, True)
1127
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1128

    
1129
    @im_settings(MODERATION_ENABLED=False,
1130
                 MANAGERS=(('Manager',
1131
                            'manager@synnefo.org'),),
1132
                 HELPDESK=(('Helpdesk',
1133
                            'helpdesk@synnefo.org'),),
1134
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1135
    def test_without_moderation(self):
1136
        backend = activation_backends.get_backend()
1137
        form = backend.get_signup_form('local')
1138
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1139

    
1140
        user_data = {
1141
            'email': 'kpap@synnefo.org',
1142
            'first_name': 'Kostas Papas',
1143
            'password1': '123',
1144
            'password2': '123'
1145
        }
1146
        form = backend.get_signup_form('local', user_data)
1147
        user = form.save(commit=False)
1148
        form.store_user(user)
1149
        self.assertEqual(user.is_active, False)
1150
        self.assertEqual(user.email_verified, False)
1151

    
1152
        # step one, registration
1153
        result = backend.handle_registration(user)
1154
        user = AstakosUser.objects.get()
1155
        self.assertEqual(user.is_active, False)
1156
        self.assertEqual(user.email_verified, False)
1157
        self.assertTrue(user.verification_code)
1158
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1159
        backend.send_result_notifications(result, user)
1160
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1161
        self.assertEqual(len(mail.outbox), 1)
1162

    
1163
        # step two, verify email (automatically
1164
        # moderates/accepts user, since moderation is disabled)
1165
        user = AstakosUser.objects.get()
1166
        valid_code = user.verification_code
1167

    
1168
        # test invalid code
1169
        result = backend.handle_verification(user, valid_code)
1170
        backend.send_result_notifications(result, user)
1171
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1172
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1173
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1174
        # verification + activated + greeting = 3
1175
        self.assertEqual(len(mail.outbox), 3)
1176
        user = AstakosUser.objects.get()
1177
        self.assertEqual(user.is_active, True)
1178
        self.assertEqual(user.moderated, True)
1179
        self.assertTrue(user.moderated_at)
1180
        self.assertEqual(user.email_verified, True)
1181
        self.assertTrue(user.activation_sent)
1182

    
1183
    @im_settings(MODERATION_ENABLED=True,
1184
                 MANAGERS=(('Manager',
1185
                            'manager@synnefo.org'),),
1186
                 HELPDESK=(('Helpdesk',
1187
                            'helpdesk@synnefo.org'),),
1188
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1189
    def test_with_moderation(self):
1190

    
1191
        backend = activation_backends.get_backend()
1192
        form = backend.get_signup_form('local')
1193
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1194

    
1195
        user_data = {
1196
            'email': 'kpap@synnefo.org',
1197
            'first_name': 'Kostas Papas',
1198
            'password1': '123',
1199
            'password2': '123'
1200
        }
1201
        form = backend.get_signup_form(provider='local',
1202
                                       initial_data=user_data)
1203
        user = form.save(commit=False)
1204
        form.store_user(user)
1205
        self.assertEqual(user.is_active, False)
1206
        self.assertEqual(user.email_verified, False)
1207

    
1208
        # step one, registration
1209
        result = backend.handle_registration(user)
1210
        user = AstakosUser.objects.get()
1211
        self.assertEqual(user.is_active, False)
1212
        self.assertEqual(user.email_verified, False)
1213
        self.assertTrue(user.verification_code)
1214
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1215
        backend.send_result_notifications(result, user)
1216
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1217
        self.assertEqual(len(mail.outbox), 1)
1218

    
1219
        # step two, verifying email
1220
        user = AstakosUser.objects.get()
1221
        valid_code = user.verification_code
1222
        invalid_code = user.verification_code + 'invalid'
1223

    
1224
        # test invalid code
1225
        result = backend.handle_verification(user, invalid_code)
1226
        self.assertEqual(result.status, backend.Result.ERROR)
1227
        backend.send_result_notifications(result, user)
1228
        user = AstakosUser.objects.get()
1229
        self.assertEqual(user.is_active, False)
1230
        self.assertEqual(user.moderated, False)
1231
        self.assertEqual(user.moderated_at, None)
1232
        self.assertEqual(user.email_verified, False)
1233
        self.assertTrue(user.activation_sent)
1234

    
1235
        # test valid code
1236
        user = AstakosUser.objects.get()
1237
        result = backend.handle_verification(user, valid_code)
1238
        backend.send_result_notifications(result, user)
1239
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1240
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1241
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1242
        self.assertEqual(len(mail.outbox), 2)
1243
        user = AstakosUser.objects.get()
1244
        self.assertEqual(user.moderated, False)
1245
        self.assertEqual(user.moderated_at, None)
1246
        self.assertEqual(user.email_verified, True)
1247
        self.assertTrue(user.activation_sent)
1248

    
1249
        # test code reuse
1250
        result = backend.handle_verification(user, valid_code)
1251
        self.assertEqual(result.status, backend.Result.ERROR)
1252
        user = AstakosUser.objects.get()
1253
        self.assertEqual(user.is_active, False)
1254
        self.assertEqual(user.moderated, False)
1255
        self.assertEqual(user.moderated_at, None)
1256
        self.assertEqual(user.email_verified, True)
1257
        self.assertTrue(user.activation_sent)
1258

    
1259
        # valid code on verified user
1260
        user = AstakosUser.objects.get()
1261
        valid_code = user.verification_code
1262
        result = backend.handle_verification(user, valid_code)
1263
        self.assertEqual(result.status, backend.Result.ERROR)
1264

    
1265
        # step three, moderation user
1266
        user = AstakosUser.objects.get()
1267
        result = backend.handle_moderation(user)
1268
        backend.send_result_notifications(result, user)
1269

    
1270
        user = AstakosUser.objects.get()
1271
        self.assertEqual(user.is_active, True)
1272
        self.assertEqual(user.moderated, True)
1273
        self.assertTrue(user.moderated_at)
1274
        self.assertEqual(user.email_verified, True)
1275
        self.assertTrue(user.activation_sent)