Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 17138f12

History | View | Annotate | Download (56.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.im.tests.common import *
35

    
36

    
37
class ShibbolethTests(TestCase):
38
    """
39
    Testing shibboleth authentication.
40
    """
41

    
42
    def setUp(self):
43
        self.client = ShibbolethClient()
44
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
45
        astakos_settings.MODERATION_ENABLED = True
46

    
47
    def tearDown(self):
48
        AstakosUser.objects.all().delete()
49

    
50
    @im_settings(FORCE_PROFILE_UPDATE=False)
51
    def test_create_account(self):
52

    
53
        client = ShibbolethClient()
54

    
55
        # shibboleth views validation
56
        # eepn required
57
        r = client.get('/im/login/shibboleth?', follow=True)
58
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
59
            'domain': astakos_settings.BASEURL,
60
            'contact_email': settings.CONTACT_EMAIL
61
        })
62
        client.set_tokens(eppn="kpapeppn")
63

    
64
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
65
        # shibboleth user info required
66
        r = client.get('/im/login/shibboleth?', follow=True)
67
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
68
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
69

    
70
        # shibboleth logged us in
71
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
72
                          cn="Kostas Papadimitriou",
73
                          ep_affiliation="Test Affiliation")
74
        r = client.get('/im/login/shibboleth?', follow=True)
75
        token = PendingThirdPartyUser.objects.get().token
76
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
77
        self.assertEqual(r.status_code, 200)
78

    
79
        # a new pending user created
80
        pending_user = PendingThirdPartyUser.objects.get(
81
            third_party_identifier="kpapeppn")
82
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
83
        # keep the token for future use
84
        token = pending_user.token
85
        # from now on no shibboleth headers are sent to the server
86
        client.reset_tokens()
87

    
88
        # this is the old way, it should fail, to avoid pending user take over
89
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
90
        self.assertEqual(r.status_code, 404)
91

    
92
        # this is the signup unique url associated with the pending user
93
        # created
94
        r = client.get('/im/signup/?third_party_token=%s' % token)
95
        identifier = pending_user.third_party_identifier
96
        post_data = {'third_party_identifier': identifier,
97
                     'first_name': 'Kostas',
98
                     'third_party_token': token,
99
                     'last_name': 'Mitroglou',
100
                     'provider': 'shibboleth'}
101

    
102
        signup_url = reverse('signup')
103

    
104
        # invlid email
105
        post_data['email'] = 'kpap'
106
        r = client.post(signup_url, post_data)
107
        self.assertContains(r, token)
108

    
109
        # existing email
110
        existing_user = get_local_user('test@test.com')
111
        post_data['email'] = 'test@test.com'
112
        r = client.post(signup_url, post_data)
113
        self.assertContains(r, messages.EMAIL_USED)
114
        existing_user.delete()
115

    
116
        # and finally a valid signup
117
        post_data['email'] = 'kpap@synnefo.org'
118
        r = client.post(signup_url, post_data, follow=True)
119
        self.assertContains(r, messages.VERIFICATION_SENT)
120

    
121
        # entires commited as expected
122
        self.assertEqual(AstakosUser.objects.count(), 1)
123
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
124
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
125

    
126
        # provider info stored
127
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
128
        self.assertEqual(provider.affiliation, 'Test Affiliation')
129
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
130
                                         u'eppn': u'kpapeppn',
131
                                         u'name': u'Kostas Papadimitriou'})
132

    
133
        # login (not activated yet)
134
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
135
                          cn="Kostas Papadimitriou", )
136
        r = client.get("/im/login/shibboleth?", follow=True)
137
        self.assertContains(r, 'is pending moderation')
138

    
139
        # admin activates the user
140
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
141
        backend = activation_backends.get_backend()
142
        activation_result = backend.verify_user(u, u.verification_code)
143
        activation_result = backend.accept_user(u)
144
        self.assertFalse(activation_result.is_error())
145
        backend.send_result_notifications(activation_result, u)
146
        self.assertEqual(u.is_active, True)
147

    
148
        # we see our profile
149
        r = client.get("/im/login/shibboleth?", follow=True)
150
        self.assertRedirects(r, '/im/landing')
151
        self.assertEqual(r.status_code, 200)
152

    
153
    def test_existing(self):
154
        """
155
        Test adding of third party login to an existing account
156
        """
157

    
158
        # this is our existing user
159
        existing_user = get_local_user('kpap@synnefo.org')
160
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
161
        existing_inactive.is_active = False
162
        existing_inactive.save()
163

    
164
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
165
        existing_unverified.is_active = False
166
        existing_unverified.activation_sent = None
167
        existing_unverified.email_verified = False
168
        existing_unverified.is_verified = False
169
        existing_unverified.save()
170

    
171
        client = ShibbolethClient()
172
        # shibboleth logged us in, notice that we use different email
173
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
174
                          cn="Kostas Papadimitriou", )
175
        r = client.get("/im/login/shibboleth?", follow=True)
176

    
177
        # a new pending user created
178
        pending_user = PendingThirdPartyUser.objects.get()
179
        token = pending_user.token
180
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
181
        pending_key = pending_user.token
182
        client.reset_tokens()
183
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
184

    
185
        form = r.context['login_form']
186
        signupdata = copy.copy(form.initial)
187
        signupdata['email'] = 'kpap@synnefo.org'
188
        signupdata['third_party_token'] = token
189
        signupdata['provider'] = 'shibboleth'
190
        signupdata.pop('id', None)
191

    
192
        # the email exists to another user
193
        r = client.post("/im/signup", signupdata)
194
        self.assertContains(r, "There is already an account with this email "
195
                               "address")
196
        # change the case, still cannot create
197
        signupdata['email'] = 'KPAP@synnefo.org'
198
        r = client.post("/im/signup", signupdata)
199
        self.assertContains(r, "There is already an account with this email "
200
                               "address")
201
        # inactive user
202
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
203
        r = client.post("/im/signup", signupdata)
204
        self.assertContains(r, "There is already an account with this email "
205
                               "address")
206

    
207
        # unverified user, this should pass, old entry will be deleted
208
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
209
        r = client.post("/im/signup", signupdata)
210

    
211
        post_data = {'password': 'password',
212
                     'username': 'kpap@synnefo.org'}
213
        r = client.post('/im/local', post_data, follow=True)
214
        self.assertTrue(r.context['request'].user.is_authenticated())
215
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
216
                          cn="Kostas Papadimitriou", )
217
        r = client.get("/im/login/shibboleth?", follow=True)
218
        self.assertContains(r, "enabled for this account")
219
        client.reset_tokens()
220

    
221
        user = existing_user
222
        self.assertTrue(user.has_auth_provider('shibboleth'))
223
        self.assertTrue(user.has_auth_provider('local',
224
                                               auth_backend='astakos'))
225
        client.logout()
226

    
227
        # look Ma, i can login with both my shibboleth and local account
228
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
229
                          cn="Kostas Papadimitriou")
230
        r = client.get("/im/login/shibboleth?", follow=True)
231
        self.assertTrue(r.context['request'].user.is_authenticated())
232
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
233
        self.assertRedirects(r, '/im/landing')
234
        self.assertEqual(r.status_code, 200)
235
        client.logout()
236
        client.reset_tokens()
237

    
238
        # logged out
239
        r = client.get("/im/profile", follow=True)
240
        self.assertFalse(r.context['request'].user.is_authenticated())
241

    
242
        # login with local account also works
243
        post_data = {'password': 'password',
244
                     'username': 'kpap@synnefo.org'}
245
        r = self.client.post('/im/local', post_data, follow=True)
246
        self.assertTrue(r.context['request'].user.is_authenticated())
247
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
248
        self.assertRedirects(r, '/im/landing')
249
        self.assertEqual(r.status_code, 200)
250

    
251
        # cannot add the same eppn
252
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
253
                          cn="Kostas Papadimitriou", )
254
        r = client.get("/im/login/shibboleth?", follow=True)
255
        self.assertRedirects(r, '/im/landing')
256
        self.assertTrue(r.status_code, 200)
257
        self.assertEquals(existing_user.auth_providers.count(), 2)
258

    
259
        # only one allowed by default
260
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
261
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
262
        prov = auth_providers.get_provider('shibboleth')
263
        r = client.get("/im/login/shibboleth?", follow=True)
264
        self.assertContains(r, "Failed to add")
265
        self.assertRedirects(r, '/im/profile')
266
        self.assertTrue(r.status_code, 200)
267
        self.assertEquals(existing_user.auth_providers.count(), 2)
268
        client.logout()
269
        client.reset_tokens()
270

    
271
        # cannot login with another eppn
272
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
273
                          cn="Kostas Papadimitriou")
274
        r = client.get("/im/login/shibboleth?", follow=True)
275
        self.assertFalse(r.context['request'].user.is_authenticated())
276

    
277
        # cannot
278

    
279
        # lets remove local password
280
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
281
                                       email="kpap@synnefo.org")
282
        remove_local_url = user.get_auth_provider('local').get_remove_url
283
        remove_shibbo_url = user.get_auth_provider('shibboleth',
284
                                                   'kpapeppn').get_remove_url
285
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
286
                          cn="Kostas Papadimtriou")
287
        r = client.get("/im/login/shibboleth?", follow=True)
288
        client.reset_tokens()
289

    
290
        # only POST is allowed (for CSRF protection)
291
        r = client.get(remove_local_url, follow=True)
292
        self.assertEqual(r.status_code, 405)
293

    
294
        r = client.post(remove_local_url, follow=True)
295
        # 2 providers left
296
        self.assertEqual(user.auth_providers.count(), 1)
297
        # cannot remove last provider
298
        r = client.post(remove_shibbo_url)
299
        self.assertEqual(r.status_code, 403)
300
        self.client.logout()
301

    
302
        # cannot login using local credentials (notice we use another client)
303
        post_data = {'password': 'password',
304
                     'username': 'kpap@synnefo.org'}
305
        r = self.client.post('/im/local', post_data, follow=True)
306
        self.assertFalse(r.context['request'].user.is_authenticated())
307

    
308
        # we can reenable the local provider by setting a password
309
        r = client.get("/im/password_change", follow=True)
310
        r = client.post("/im/password_change", {'new_password1': '111',
311
                                                'new_password2': '111'},
312
                        follow=True)
313
        user = r.context['request'].user
314
        self.assertTrue(user.has_auth_provider('local'))
315
        self.assertTrue(user.has_auth_provider('shibboleth'))
316
        self.assertTrue(user.check_password('111'))
317
        self.assertTrue(user.has_usable_password())
318
        self.client.logout()
319

    
320
        # now we can login
321
        post_data = {'password': '111',
322
                     'username': 'kpap@synnefo.org'}
323
        r = self.client.post('/im/local', post_data, follow=True)
324
        self.assertTrue(r.context['request'].user.is_authenticated())
325

    
326
        client.reset_tokens()
327

    
328
        # we cannot take over another shibboleth identifier
329
        user2 = get_local_user('another@synnefo.org')
330
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
331
        # login
332
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
333
                          cn="Kostas Papadimitriou")
334
        r = client.get("/im/login/shibboleth?", follow=True)
335
        # try to assign existing shibboleth identifier of another user
336
        client.set_tokens(mail="kpap_second@shibboleth.gr",
337
                          eppn="existingeppn", cn="Kostas Papadimitriou")
338
        r = client.get("/im/login/shibboleth?", follow=True)
339
        self.assertContains(r, "is already in use")
340

    
341

    
342
class TestLocal(TestCase):
343

    
344
    def setUp(self):
345
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
346
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
347
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
348
        settings.ASTAKOS_MODERATION_ENABLED = True
349

    
350
    def tearDown(self):
351
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
352
        AstakosUser.objects.all().delete()
353

    
354
    def test_no_moderation(self):
355
        # disable moderation
356
        astakos_settings.MODERATION_ENABLED = False
357

    
358
        # create a new user
359
        r = self.client.get("/im/signup")
360
        self.assertEqual(r.status_code, 200)
361
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
362
                'password2': 'password', 'first_name': 'Kostas',
363
                'last_name': 'Mitroglou', 'provider': 'local'}
364
        r = self.client.post("/im/signup", data)
365

    
366
        # user created
367
        self.assertEqual(AstakosUser.objects.count(), 1)
368
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
369
                                       email="kpap@synnefo.org")
370
        self.assertEqual(user.username, 'kpap@synnefo.org')
371
        self.assertEqual(user.has_auth_provider('local'), True)
372
        self.assertFalse(user.is_active)
373

    
374
        # user (but not admin) gets notified
375
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
376
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
377
        astakos_settings.MODERATION_ENABLED = True
378

    
379
    def test_email_case(self):
380
        data = {
381
            'email': 'kPap@synnefo.org',
382
            'password1': '1234',
383
            'password2': '1234'
384
        }
385

    
386
        form = forms.LocalUserCreationForm(data)
387
        self.assertTrue(form.is_valid())
388
        user = form.save()
389
        form.store_user(user, {})
390

    
391
        u = AstakosUser.objects.get()
392
        self.assertEqual(u.email, 'kPap@synnefo.org')
393
        self.assertEqual(u.username, 'kpap@synnefo.org')
394
        u.is_active = True
395
        u.email_verified = True
396
        u.save()
397

    
398
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
399
        login = forms.LoginForm(data=data)
400
        self.assertTrue(login.is_valid())
401

    
402
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
403
        login = forms.LoginForm(data=data)
404
        self.assertTrue(login.is_valid())
405

    
406
        data = {
407
            'email': 'kpap@synnefo.org',
408
            'password1': '1234',
409
            'password2': '1234'
410
        }
411
        form = forms.LocalUserCreationForm(data)
412
        self.assertFalse(form.is_valid())
413

    
414
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
415
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
416
    def test_local_provider(self):
417
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
418

    
419
        # create a user
420
        r = self.client.get("/im/signup")
421
        self.assertEqual(r.status_code, 200)
422
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
423
                'password2': 'password', 'first_name': 'Kostas',
424
                'last_name': 'Mitroglou', 'provider': 'local'}
425
        r = self.client.post("/im/signup", data)
426

    
427
        # user created
428
        self.assertEqual(AstakosUser.objects.count(), 1)
429
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
430
                                       email="kpap@synnefo.org")
431
        self.assertEqual(user.username, 'kpap@synnefo.org')
432
        self.assertEqual(user.has_auth_provider('local'), True)
433
        self.assertFalse(user.is_active)  # not activated
434
        self.assertFalse(user.email_verified)  # not verified
435
        self.assertTrue(user.activation_sent)  # activation automatically sent
436
        self.assertFalse(user.moderated)
437
        self.assertFalse(user.email_verified)
438

    
439
        # admin gets notified and activates the user from the command line
440
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
441
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
442
                                           'password': 'password'},
443
                             follow=True)
444
        self.assertContains(r, messages.VERIFICATION_SENT)
445
        backend = activation_backends.get_backend()
446

    
447
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
448
        backend.send_user_verification_email(user)
449

    
450
        # user activation fields updated and user gets notified via email
451
        user = AstakosUser.objects.get(pk=user.pk)
452
        self.assertTrue(user.activation_sent)
453
        self.assertFalse(user.email_verified)
454
        self.assertFalse(user.is_active)
455
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
456

    
457
        # user forgot she got registered and tries to submit registration
458
        # form. Notice the upper case in email
459
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
460
                'password2': 'password', 'first_name': 'Kostas',
461
                'last_name': 'Mitroglou', 'provider': 'local'}
462
        r = self.client.post("/im/signup", data, follow=True)
463
        self.assertRedirects(r, reverse('index'))
464
        self.assertContains(r, messages.VERIFICATION_SENT)
465

    
466
        user = AstakosUser.objects.get()
467
        # previous user replaced
468
        self.assertTrue(user.activation_sent)
469
        self.assertFalse(user.email_verified)
470
        self.assertFalse(user.is_active)
471
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
472

    
473
        # hmmm, email exists; lets request a password change
474
        r = self.client.get('/im/local/password_reset')
475
        self.assertEqual(r.status_code, 200)
476
        data = {'email': 'kpap@synnefo.org'}
477
        r = self.client.post('/im/local/password_reset', data, follow=True)
478
        # she can't because account is not active yet
479
        self.assertContains(r, 'pending activation')
480

    
481
        # moderation is enabled and an activation email has already been sent
482
        # so user can trigger resend of the activation email
483
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
484
        self.assertContains(r, 'has been sent to your email address.')
485
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
486

    
487
        # also she cannot login
488
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
489
        r = self.client.post('/im/local', data, follow=True)
490
        self.assertContains(r, 'Resend activation')
491
        self.assertFalse(r.context['request'].user.is_authenticated())
492
        self.assertFalse('_pithos2_a' in self.client.cookies)
493

    
494
        # user sees the message and resends activation
495
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
496
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
497

    
498
        # logged in user cannot activate another account
499
        tmp_user = get_local_user("test_existing_user@synnefo.org")
500
        tmp_client = Client()
501
        tmp_client.login(username="test_existing_user@synnefo.org",
502
                         password="password")
503
        r = tmp_client.get(user.get_activation_url(), follow=True)
504
        self.assertContains(r, messages.LOGGED_IN_WARNING)
505

    
506
        r = self.client.get(user.get_activation_url(), follow=True)
507
        # previous code got invalidated
508
        self.assertEqual(r.status_code, 404)
509

    
510
        user = AstakosUser.objects.get(pk=user.pk)
511
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
512
        r = self.client.get(user.get_activation_url(), follow=True)
513
        self.assertRedirects(r, reverse('index'))
514
        # user sees that account is pending approval from admins
515
        self.assertContains(r, messages.NOTIFICATION_SENT)
516
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
517

    
518
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
519
        result = backend.handle_moderation(user)
520
        backend.send_result_notifications(result, user)
521
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
522
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
523

    
524
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
525
        r = self.client.get('/im/profile', follow=True)
526
        self.assertFalse(r.context['request'].user.is_authenticated())
527
        self.assertFalse('_pithos2_a' in self.client.cookies)
528
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
529

    
530
        user = AstakosUser.objects.get(pk=user.pk)
531
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
532
                                           'password': 'password'},
533
                             follow=True)
534
        # user activated and logged in, token cookie set
535
        self.assertTrue(r.context['request'].user.is_authenticated())
536
        self.assertTrue('_pithos2_a' in self.client.cookies)
537
        cookies = self.client.cookies
538
        self.assertTrue(quote(user.auth_token) in
539
                        cookies.get('_pithos2_a').value)
540
        r = self.client.get('/im/logout', follow=True)
541
        r = self.client.get('/im/')
542
        # user logged out, token cookie removed
543
        self.assertFalse(r.context['request'].user.is_authenticated())
544
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
545

    
546
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
547
        del self.client.cookies['_pithos2_a']
548

    
549
        # user can login
550
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
551
                                           'password': 'password'},
552
                             follow=True)
553
        self.assertTrue(r.context['request'].user.is_authenticated())
554
        self.assertTrue('_pithos2_a' in self.client.cookies)
555
        cookies = self.client.cookies
556
        self.assertTrue(quote(user.auth_token) in
557
                        cookies.get('_pithos2_a').value)
558
        self.client.get('/im/logout', follow=True)
559

    
560
        # user forgot password
561
        old_pass = user.password
562
        r = self.client.get('/im/local/password_reset')
563
        self.assertEqual(r.status_code, 200)
564
        r = self.client.post('/im/local/password_reset', {'email':
565
                                                          'kpap@synnefo.org'})
566
        self.assertEqual(r.status_code, 302)
567
        # email sent
568
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
569

    
570
        # user visits change password link
571
        user = AstakosUser.objects.get(pk=user.pk)
572
        r = self.client.get(user.get_password_reset_url())
573
        r = self.client.post(user.get_password_reset_url(),
574
                             {'new_password1': 'newpass',
575
                              'new_password2': 'newpass'})
576

    
577
        user = AstakosUser.objects.get(pk=user.pk)
578
        self.assertNotEqual(old_pass, user.password)
579

    
580
        # old pass is not usable
581
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
582
                                           'password': 'password'})
583
        self.assertContains(r, 'Please enter a correct username and password')
584
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
585
                                           'password': 'newpass'},
586
                             follow=True)
587
        self.assertTrue(r.context['request'].user.is_authenticated())
588
        self.client.logout()
589

    
590
        # tests of special local backends
591
        user = AstakosUser.objects.get(pk=user.pk)
592
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
593
        user.save()
594

    
595
        # non astakos local backends do not support password reset
596
        r = self.client.get('/im/local/password_reset')
597
        self.assertEqual(r.status_code, 200)
598
        r = self.client.post('/im/local/password_reset', {'email':
599
                                                          'kpap@synnefo.org'})
600
        # she can't because account is not active yet
601
        self.assertContains(r, "Changing password is not")
602

    
603

    
604
class UserActionsTests(TestCase):
605

    
606
    def test_email_change(self):
607
        # to test existing email validation
608
        get_local_user('existing@synnefo.org')
609

    
610
        # local user
611
        user = get_local_user('kpap@synnefo.org')
612

    
613
        # login as kpap
614
        self.client.login(username='kpap@synnefo.org', password='password')
615
        r = self.client.get('/im/profile', follow=True)
616
        user = r.context['request'].user
617
        self.assertTrue(user.is_authenticated())
618

    
619
        # change email is enabled
620
        r = self.client.get('/im/email_change')
621
        self.assertEqual(r.status_code, 200)
622
        self.assertFalse(user.email_change_is_pending())
623

    
624
        # request email change to an existing email fails
625
        data = {'new_email_address': 'existing@synnefo.org'}
626
        r = self.client.post('/im/email_change', data)
627
        self.assertContains(r, messages.EMAIL_USED)
628

    
629
        # proper email change
630
        data = {'new_email_address': 'kpap@gmail.com'}
631
        r = self.client.post('/im/email_change', data, follow=True)
632
        self.assertRedirects(r, '/im/profile')
633
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
634
        change1 = EmailChange.objects.get()
635

    
636
        # user sees a warning
637
        r = self.client.get('/im/email_change')
638
        self.assertEqual(r.status_code, 200)
639
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
640
        self.assertTrue(user.email_change_is_pending())
641

    
642
        # link was sent
643
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
644
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
645

    
646
        # proper email change
647
        data = {'new_email_address': 'kpap@yahoo.com'}
648
        r = self.client.post('/im/email_change', data, follow=True)
649
        self.assertRedirects(r, '/im/profile')
650
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
651
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
652
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
653
        change2 = EmailChange.objects.get()
654

    
655
        r = self.client.get(change1.get_url())
656
        self.assertEquals(r.status_code, 404)
657
        self.client.logout()
658

    
659
        invalid_client = Client()
660
        r = invalid_client.post('/im/local?',
661
                                {'username': 'existing@synnefo.org',
662
                                 'password': 'password'})
663
        r = invalid_client.get(change2.get_url(), follow=True)
664
        self.assertEquals(r.status_code, 403)
665

    
666
        r = self.client.post('/im/local?next=' + change2.get_url(),
667
                             {'username': 'kpap@synnefo.org',
668
                              'password': 'password',
669
                              'next': change2.get_url()},
670
                             follow=True)
671
        self.assertRedirects(r, '/im/profile')
672
        user = r.context['request'].user
673
        self.assertEquals(user.email, 'kpap@yahoo.com')
674
        self.assertEquals(user.username, 'kpap@yahoo.com')
675

    
676
        self.client.logout()
677
        r = self.client.post('/im/local?next=' + change2.get_url(),
678
                             {'username': 'kpap@synnefo.org',
679
                              'password': 'password',
680
                              'next': change2.get_url()},
681
                             follow=True)
682
        self.assertContains(r, "Please enter a correct username and password")
683
        self.assertEqual(user.emailchanges.count(), 0)
684

    
685

    
686
class TestAuthProviderViews(TestCase):
687

    
688
    def tearDown(self):
689
        AstakosUser.objects.all().delete()
690

    
691
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
692
                         AUTOMODERATE_POLICY=True)
693
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
694
                 FORCE_PROFILE_UPDATE=False)
695
    def test_user(self):
696
        Profile = AuthProviderPolicyProfile
697
        Pending = PendingThirdPartyUser
698
        User = AstakosUser
699

    
700
        User.objects.create(email="newuser@synnefo.org")
701
        get_local_user("olduser@synnefo.org")
702
        cl_olduser = ShibbolethClient()
703
        get_local_user("olduser2@synnefo.org")
704
        ShibbolethClient()
705
        cl_newuser = ShibbolethClient()
706
        cl_newuser2 = Client()
707

    
708
        academic_group, created = Group.objects.get_or_create(
709
            name='academic-login')
710
        academic_users = academic_group.user_set
711
        assert created
712
        policy_only_academic = Profile.objects.add_policy('academic_strict',
713
                                                          'shibboleth',
714
                                                          academic_group,
715
                                                          exclusive=True,
716
                                                          login=False,
717
                                                          add=False)
718

    
719
        # new academic user
720
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
721
        cl_newuser.set_tokens(eppn="newusereppn")
722
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
723
        pending = Pending.objects.get()
724
        identifier = pending.third_party_identifier
725
        signup_data = {'third_party_identifier': identifier,
726
                       'first_name': 'Academic',
727
                       'third_party_token': pending.token,
728
                       'last_name': 'New User',
729
                       'provider': 'shibboleth'}
730
        r = cl_newuser.post('/im/signup', signup_data)
731
        self.assertContains(r, "This field is required", )
732
        signup_data['email'] = 'olduser@synnefo.org'
733
        r = cl_newuser.post('/im/signup', signup_data)
734
        self.assertContains(r, "already an account with this email", )
735
        signup_data['email'] = 'newuser@synnefo.org'
736
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
737
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
738
        self.assertEqual(r.status_code, 404)
739
        newuser = User.objects.get(email="newuser@synnefo.org")
740
        activation_link = newuser.get_activation_url()
741
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
742

    
743
        # new non-academic user
744
        signup_data = {'first_name': 'Non Academic',
745
                       'last_name': 'New User',
746
                       'provider': 'local',
747
                       'password1': 'password',
748
                       'password2': 'password'}
749
        signup_data['email'] = 'olduser@synnefo.org'
750
        r = cl_newuser2.post('/im/signup', signup_data)
751
        self.assertContains(r, 'There is already an account with this '
752
                               'email address')
753
        signup_data['email'] = 'newuser@synnefo.org'
754
        r = cl_newuser2.post('/im/signup/', signup_data)
755
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
756
        r = self.client.get(activation_link, follow=True)
757
        self.assertEqual(r.status_code, 404)
758
        newuser = User.objects.get(email="newuser@synnefo.org")
759
        self.assertTrue(newuser.activation_sent)
760

    
761
        # activation sent, user didn't open verification url so additional
762
        # registrations invalidate the previous signups.
763
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
764
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
765
        pending = Pending.objects.get()
766
        identifier = pending.third_party_identifier
767
        signup_data = {'third_party_identifier': identifier,
768
                       'first_name': 'Academic',
769
                       'third_party_token': pending.token,
770
                       'last_name': 'New User',
771
                       'provider': 'shibboleth'}
772
        signup_data['email'] = 'newuser@synnefo.org'
773
        r = cl_newuser.post('/im/signup', signup_data)
774
        self.assertEqual(r.status_code, 302)
775
        newuser = User.objects.get(email="newuser@synnefo.org")
776
        self.assertTrue(newuser.activation_sent)
777
        activation_link = newuser.get_activation_url()
778
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
779
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
780
        self.assertRedirects(r, '/im/landing')
781
        newuser = User.objects.get(email="newuser@synnefo.org")
782
        self.assertEqual(newuser.is_active, True)
783
        self.assertEqual(newuser.email_verified, True)
784
        cl_newuser.logout()
785

    
786
        # cannot reactivate if suspended
787
        newuser.is_active = False
788
        newuser.save()
789
        r = cl_newuser.get(newuser.get_activation_url())
790
        newuser = User.objects.get(email="newuser@synnefo.org")
791
        self.assertFalse(newuser.is_active)
792

    
793
        # release suspension
794
        newuser.is_active = True
795
        newuser.save()
796

    
797
        cl_newuser.get('/im/login/shibboleth?', follow=True)
798
        local = auth.get_provider('local', newuser)
799
        self.assertEqual(local.get_add_policy, False)
800
        self.assertEqual(local.get_login_policy, False)
801
        r = cl_newuser.get(local.get_add_url, follow=True)
802
        self.assertRedirects(r, '/im/profile')
803
        self.assertContains(r, 'disabled for your')
804

    
805
        cl_olduser.login(username='olduser@synnefo.org', password="password")
806
        r = cl_olduser.get('/im/profile', follow=True)
807
        self.assertEqual(r.status_code, 200)
808
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
809
        self.assertContains(r, 'Your request is missing a unique token')
810
        cl_olduser.set_tokens(eppn="newusereppn")
811
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
812
        self.assertContains(r, 'already in use')
813
        cl_olduser.set_tokens(eppn="oldusereppn")
814
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
815
        self.assertContains(r, 'Academic login enabled for this account')
816

    
817
        user = User.objects.get(email="olduser@synnefo.org")
818
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
819
        local_provider = user.get_auth_provider('local')
820
        self.assertEqual(shib_provider.get_remove_policy, True)
821
        self.assertEqual(local_provider.get_remove_policy, True)
822

    
823
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
824
                                                          'shibboleth',
825
                                                          academic_group,
826
                                                          remove=False)
827
        user.groups.add(academic_group)
828
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
829
        local_provider = user.get_auth_provider('local')
830
        self.assertEqual(shib_provider.get_remove_policy, False)
831
        self.assertEqual(local_provider.get_remove_policy, True)
832
        self.assertEqual(local_provider.get_login_policy, False)
833

    
834
        cl_olduser.logout()
835
        login_data = {'username': 'olduser@synnefo.org',
836
                      'password': 'password'}
837
        r = cl_olduser.post('/im/local', login_data, follow=True)
838
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
839
        Group.objects.all().delete()
840

    
841

    
842
class TestAuthProvidersAPI(TestCase):
843
    """
844
    Test auth_providers module API
845
    """
846

    
847
    def tearDown(self):
848
        Group.objects.all().delete()
849

    
850
    @im_settings(IM_MODULES=['local', 'shibboleth'])
851
    def test_create(self):
852
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
853
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
854

    
855
        module = 'shibboleth'
856
        identifier = 'SHIB_UUID'
857
        provider_params = {
858
            'affiliation': 'UNIVERSITY',
859
            'info': {'age': 27}
860
        }
861
        provider = auth.get_provider(module, user2, identifier,
862
                                     **provider_params)
863
        provider.add_to_user()
864
        provider = auth.get_provider(module, user, identifier,
865
                                     **provider_params)
866
        provider.add_to_user()
867
        user.email_verified = True
868
        user.save()
869
        self.assertRaises(Exception, provider.add_to_user)
870
        provider = user.get_auth_provider(module, identifier)
871
        self.assertEqual(user.get_auth_provider(
872
            module, identifier)._instance.info.get('age'), 27)
873

    
874
        module = 'local'
875
        identifier = None
876
        provider_params = {'auth_backend': 'ldap', 'info':
877
                          {'office': 'A1'}}
878
        provider = auth.get_provider(module, user, identifier,
879
                                     **provider_params)
880
        provider.add_to_user()
881
        self.assertFalse(provider.get_add_policy)
882
        self.assertRaises(Exception, provider.add_to_user)
883

    
884
        shib = user.get_auth_provider('shibboleth',
885
                                      'SHIB_UUID')
886
        self.assertTrue(shib.get_remove_policy)
887

    
888
        local = user.get_auth_provider('local')
889
        self.assertTrue(local.get_remove_policy)
890

    
891
        local.remove_from_user()
892
        self.assertFalse(shib.get_remove_policy)
893
        self.assertRaises(Exception, shib.remove_from_user)
894

    
895
        provider = user.get_auth_providers()[0]
896
        self.assertRaises(Exception, provider.add_to_user)
897

    
898
    @im_settings(IM_MODULES=['local', 'shibboleth'])
899
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
900
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
901
                                                 'group2'])
902
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
903
                        CREATION_GROUPS_POLICY=['localgroup-create',
904
                                                'group-create'])
905
    def test_add_groups(self):
906
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
907
        provider = auth.get_provider('shibboleth', user, 'test123')
908
        provider.add_to_user()
909
        user = AstakosUser.objects.get()
910
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
911
                              sorted([u'group1', u'group2', u'group-create']))
912

    
913
        local = auth.get_provider('local', user)
914
        local.add_to_user()
915
        provider = user.get_auth_provider('shibboleth')
916
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
917
        provider.remove_from_user()
918
        user = AstakosUser.objects.get()
919
        self.assertEqual(len(user.get_auth_providers()), 1)
920
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
921
                              sorted([u'group-create', u'localgroup']))
922

    
923
        local = user.get_auth_provider('local')
924
        self.assertRaises(Exception, local.remove_from_user)
925
        provider = auth.get_provider('shibboleth', user, 'test123')
926
        provider.add_to_user()
927
        user = AstakosUser.objects.get()
928
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
929
                              sorted([u'group-create', u'group1', u'group2',
930
                               u'localgroup']))
931
        Group.objects.all().delete()
932

    
933

    
934

    
935
    @im_settings(IM_MODULES=['local', 'shibboleth'])
936
    def test_policies(self):
937
        group_old, created = Group.objects.get_or_create(name='olduser')
938

    
939
        astakos_settings.MODERATION_ENABLED = True
940
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
941
            ['academic-user']
942
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
943
            ['google-user']
944

    
945
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
946
        user.groups.add(group_old)
947
        user.add_auth_provider('local')
948

    
949
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
950
        user2.add_auth_provider('shibboleth', identifier='shibid')
951

    
952
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
953
        user3.groups.add(group_old)
954
        user3.add_auth_provider('local')
955
        user3.add_auth_provider('shibboleth', identifier='1234')
956

    
957
        self.assertTrue(user2.groups.get(name='academic-user'))
958
        self.assertFalse(user2.groups.filter(name='olduser').count())
959

    
960
        local = auth_providers.get_provider('local')
961
        self.assertTrue(local.get_add_policy)
962

    
963
        academic_group = Group.objects.get(name='academic-user')
964
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
965
                                                     academic_group,
966
                                                     exclusive=True,
967
                                                     add=False,
968
                                                     login=False)
969
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
970
                                                     academic_group,
971
                                                     exclusive=True,
972
                                                     login=False,
973
                                                     add=False)
974
        # no duplicate entry gets created
975
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
976

    
977
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
978
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
979
                                                     user2,
980
                                                     remove=False)
981
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
982

    
983
        local = auth_providers.get_provider('local', user2)
984
        google = auth_providers.get_provider('google', user2)
985
        shibboleth = auth_providers.get_provider('shibboleth', user2)
986
        self.assertTrue(shibboleth.get_login_policy)
987
        self.assertFalse(shibboleth.get_remove_policy)
988
        self.assertFalse(local.get_add_policy)
989
        self.assertFalse(local.get_add_policy)
990
        self.assertFalse(google.get_add_policy)
991

    
992
        user2.groups.remove(Group.objects.get(name='academic-user'))
993
        self.assertTrue(local.get_add_policy)
994
        self.assertTrue(google.get_add_policy)
995
        user2.groups.add(Group.objects.get(name='academic-user'))
996

    
997
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
998
                                                     user2,
999
                                                     exclusive=True,
1000
                                                     add=True)
1001
        self.assertTrue(local.get_add_policy)
1002
        self.assertTrue(google.get_add_policy)
1003

    
1004
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1005
        self.assertFalse(local.get_automoderate_policy)
1006
        self.assertFalse(google.get_automoderate_policy)
1007
        self.assertTrue(shibboleth.get_automoderate_policy)
1008

    
1009
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1010
                  'GOOGLE_ADD_GROUPS_POLICY']:
1011
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1012

    
1013

    
1014
    @shibboleth_settings(CREATE_POLICY=True)
1015
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1016
    def test_create_http(self):
1017
        # this should be wrapped inside a transaction
1018
        user = AstakosUser(email="test@test.com")
1019
        user.save()
1020
        provider = auth_providers.get_provider('shibboleth', user,
1021
                                               'test@academia.test')
1022
        provider.add_to_user()
1023
        user.get_auth_provider('shibboleth', 'test@academia.test')
1024
        provider = auth_providers.get_provider('local', user)
1025
        provider.add_to_user()
1026
        user.get_auth_provider('local')
1027

    
1028
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1029
        user = AstakosUser(email="test2@test.com")
1030
        user.save()
1031
        provider = auth_providers.get_provider('shibboleth', user,
1032
                                               'test@shibboleth.com',
1033
                                               **{'info': {'name':
1034
                                                                'User Test'}})
1035
        self.assertFalse(provider.get_create_policy)
1036
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1037
        self.assertTrue(provider.get_create_policy)
1038
        academic = provider.add_to_user()
1039

    
1040
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1041
    @shibboleth_settings(LIMIT_POLICY=2)
1042
    def test_policies(self):
1043
        user = get_local_user('kpap@synnefo.org')
1044
        user.add_auth_provider('shibboleth', identifier='1234')
1045
        user.add_auth_provider('shibboleth', identifier='12345')
1046

    
1047
        # default limit is 1
1048
        local = user.get_auth_provider('local')
1049
        self.assertEqual(local.get_add_policy, False)
1050

    
1051
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1052
        academic = user.get_auth_provider('shibboleth',
1053
                                          identifier='1234')
1054
        self.assertEqual(academic.get_add_policy, False)
1055
        newacademic = auth_providers.get_provider('shibboleth', user,
1056
                                                  identifier='123456')
1057
        self.assertEqual(newacademic.get_add_policy, True)
1058
        user.add_auth_provider('shibboleth', identifier='123456')
1059
        self.assertEqual(academic.get_add_policy, False)
1060
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1061

    
1062
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1063
    @shibboleth_settings(LIMIT_POLICY=2)
1064
    def test_messages(self):
1065
        user = get_local_user('kpap@synnefo.org')
1066
        user.add_auth_provider('shibboleth', identifier='1234')
1067
        user.add_auth_provider('shibboleth', identifier='12345')
1068
        provider = auth_providers.get_provider('shibboleth')
1069
        self.assertEqual(provider.get_message('title'), 'Academic')
1070
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1071
        # regenerate messages cache
1072
        provider = auth_providers.get_provider('shibboleth')
1073
        self.assertEqual(provider.get_message('title'), 'New title')
1074
        self.assertEqual(provider.get_message('login_title'),
1075
                         'New title LOGIN')
1076
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1077
        self.assertEqual(provider.get_module_icon,
1078
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1079
        self.assertEqual(provider.get_module_medium_icon,
1080
                         settings.MEDIA_URL +
1081
                         'im/auth/icons-medium/shibboleth.png')
1082

    
1083
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1084
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1085
        self.assertEqual(provider.get_method_details_msg,
1086
                         'Account: 12345')
1087
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1088
        self.assertEqual(provider.get_method_details_msg,
1089
                         'Account: 1234')
1090

    
1091
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1092
        self.assertEqual(provider.get_not_active_msg,
1093
                         "'Academic login' is disabled.")
1094

    
1095
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1096
    @shibboleth_settings(LIMIT_POLICY=2)
1097
    def test_templates(self):
1098
        user = get_local_user('kpap@synnefo.org')
1099
        user.add_auth_provider('shibboleth', identifier='1234')
1100
        user.add_auth_provider('shibboleth', identifier='12345')
1101

    
1102
        provider = auth_providers.get_provider('shibboleth')
1103
        self.assertEqual(provider.get_template('login'),
1104
                         'im/auth/shibboleth_login.html')
1105
        provider = auth_providers.get_provider('google')
1106
        self.assertEqual(provider.get_template('login'),
1107
                         'im/auth/generic_login.html')
1108

    
1109

    
1110
class TestActivationBackend(TestCase):
1111

    
1112
    def setUp(self):
1113
        # dummy call to pass through logging middleware
1114
        self.client.get('/im/')
1115

    
1116
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1117
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1118
    def test_policies(self):
1119
        backend = activation_backends.get_backend()
1120

    
1121
        # email matches RE_USER_EMAIL_PATTERNS
1122
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1123
                               is_active=False, email_verified=False)
1124
        backend.handle_verification(user1, user1.verification_code)
1125
        self.assertEqual(user1.accepted_policy, 'email')
1126

    
1127
        # manually moderated
1128
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1129
                               is_active=False, email_verified=False)
1130

    
1131
        backend.handle_verification(user2, user2.verification_code)
1132
        self.assertEqual(user2.moderated, False)
1133
        backend.handle_moderation(user2)
1134
        self.assertEqual(user2.moderated, True)
1135
        self.assertEqual(user2.accepted_policy, 'manual')
1136

    
1137
        # autoaccept due to provider automoderate policy
1138
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1139
                               is_active=False, email_verified=False)
1140
        user3.auth_providers.all().delete()
1141
        user3.add_auth_provider('shibboleth', identifier='shib123')
1142
        backend.handle_verification(user3, user3.verification_code)
1143
        self.assertEqual(user3.moderated, True)
1144
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1145

    
1146
    @im_settings(MODERATION_ENABLED=False,
1147
                 MANAGERS=(('Manager',
1148
                            'manager@synnefo.org'),),
1149
                 HELPDESK=(('Helpdesk',
1150
                            'helpdesk@synnefo.org'),),
1151
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1152
    def test_without_moderation(self):
1153
        backend = activation_backends.get_backend()
1154
        form = backend.get_signup_form('local')
1155
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1156

    
1157
        user_data = {
1158
            'email': 'kpap@synnefo.org',
1159
            'first_name': 'Kostas Papas',
1160
            'password1': '123',
1161
            'password2': '123'
1162
        }
1163
        form = backend.get_signup_form('local', user_data)
1164
        user = form.save(commit=False)
1165
        form.store_user(user)
1166
        self.assertEqual(user.is_active, False)
1167
        self.assertEqual(user.email_verified, False)
1168

    
1169
        # step one, registration
1170
        result = backend.handle_registration(user)
1171
        user = AstakosUser.objects.get()
1172
        self.assertEqual(user.is_active, False)
1173
        self.assertEqual(user.email_verified, False)
1174
        self.assertTrue(user.verification_code)
1175
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1176
        backend.send_result_notifications(result, user)
1177
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1178
        self.assertEqual(len(mail.outbox), 1)
1179

    
1180
        # step two, verify email (automatically
1181
        # moderates/accepts user, since moderation is disabled)
1182
        user = AstakosUser.objects.get()
1183
        valid_code = user.verification_code
1184

    
1185
        # test invalid code
1186
        result = backend.handle_verification(user, valid_code)
1187
        backend.send_result_notifications(result, user)
1188
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1189
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1190
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1191
        # verification + activated + greeting = 3
1192
        self.assertEqual(len(mail.outbox), 3)
1193
        user = AstakosUser.objects.get()
1194
        self.assertEqual(user.is_active, True)
1195
        self.assertEqual(user.moderated, True)
1196
        self.assertTrue(user.moderated_at)
1197
        self.assertEqual(user.email_verified, True)
1198
        self.assertTrue(user.activation_sent)
1199

    
1200
    @im_settings(MODERATION_ENABLED=True,
1201
                 MANAGERS=(('Manager',
1202
                            'manager@synnefo.org'),),
1203
                 HELPDESK=(('Helpdesk',
1204
                            'helpdesk@synnefo.org'),),
1205
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1206
    def test_with_moderation(self):
1207

    
1208
        backend = activation_backends.get_backend()
1209
        form = backend.get_signup_form('local')
1210
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1211

    
1212
        user_data = {
1213
            'email': 'kpap@synnefo.org',
1214
            'first_name': 'Kostas Papas',
1215
            'password1': '123',
1216
            'password2': '123'
1217
        }
1218
        form = backend.get_signup_form(provider='local',
1219
                                       initial_data=user_data)
1220
        user = form.save(commit=False)
1221
        form.store_user(user)
1222
        self.assertEqual(user.is_active, False)
1223
        self.assertEqual(user.email_verified, False)
1224

    
1225
        # step one, registration
1226
        result = backend.handle_registration(user)
1227
        user = AstakosUser.objects.get()
1228
        self.assertEqual(user.is_active, False)
1229
        self.assertEqual(user.email_verified, False)
1230
        self.assertTrue(user.verification_code)
1231
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1232
        backend.send_result_notifications(result, user)
1233
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1234
        self.assertEqual(len(mail.outbox), 1)
1235

    
1236
        # step two, verifying email
1237
        user = AstakosUser.objects.get()
1238
        valid_code = user.verification_code
1239
        invalid_code = user.verification_code + 'invalid'
1240

    
1241
        # test invalid code
1242
        result = backend.handle_verification(user, invalid_code)
1243
        self.assertEqual(result.status, backend.Result.ERROR)
1244
        backend.send_result_notifications(result, user)
1245
        user = AstakosUser.objects.get()
1246
        self.assertEqual(user.is_active, False)
1247
        self.assertEqual(user.moderated, False)
1248
        self.assertEqual(user.moderated_at, None)
1249
        self.assertEqual(user.email_verified, False)
1250
        self.assertTrue(user.activation_sent)
1251

    
1252
        # test valid code
1253
        user = AstakosUser.objects.get()
1254
        result = backend.handle_verification(user, valid_code)
1255
        backend.send_result_notifications(result, user)
1256
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1257
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1258
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1259
        self.assertEqual(len(mail.outbox), 2)
1260
        user = AstakosUser.objects.get()
1261
        self.assertEqual(user.moderated, False)
1262
        self.assertEqual(user.moderated_at, None)
1263
        self.assertEqual(user.email_verified, True)
1264
        self.assertTrue(user.activation_sent)
1265

    
1266
        # test code reuse
1267
        result = backend.handle_verification(user, valid_code)
1268
        self.assertEqual(result.status, backend.Result.ERROR)
1269
        user = AstakosUser.objects.get()
1270
        self.assertEqual(user.is_active, False)
1271
        self.assertEqual(user.moderated, False)
1272
        self.assertEqual(user.moderated_at, None)
1273
        self.assertEqual(user.email_verified, True)
1274
        self.assertTrue(user.activation_sent)
1275

    
1276
        # valid code on verified user
1277
        user = AstakosUser.objects.get()
1278
        valid_code = user.verification_code
1279
        result = backend.handle_verification(user, valid_code)
1280
        self.assertEqual(result.status, backend.Result.ERROR)
1281

    
1282
        # step three, moderation user
1283
        user = AstakosUser.objects.get()
1284
        result = backend.handle_moderation(user)
1285
        backend.send_result_notifications(result, user)
1286

    
1287
        user = AstakosUser.objects.get()
1288
        self.assertEqual(user.is_active, True)
1289
        self.assertEqual(user.moderated, True)
1290
        self.assertTrue(user.moderated_at)
1291
        self.assertEqual(user.email_verified, True)
1292
        self.assertTrue(user.activation_sent)