Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ c34abd9c

History | View | Annotate | Download (56.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.im.tests.common import *
35

    
36

    
37
class ShibbolethTests(TestCase):
38
    """
39
    Testing shibboleth authentication.
40
    """
41

    
42
    def setUp(self):
43
        self.client = ShibbolethClient()
44
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
45
        astakos_settings.MODERATION_ENABLED = True
46

    
47
    def tearDown(self):
48
        AstakosUser.objects.all().delete()
49

    
50
    @im_settings(FORCE_PROFILE_UPDATE=False)
51
    def test_create_account(self):
52

    
53
        client = ShibbolethClient()
54

    
55
        # shibboleth views validation
56
        # eepn required
57
        r = client.get('/im/login/shibboleth?', follow=True)
58
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
59
            'domain': astakos_settings.BASEURL,
60
            'contact_email': settings.CONTACT_EMAIL
61
        })
62
        client.set_tokens(eppn="kpapeppn")
63

    
64
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
65
        # shibboleth user info required
66
        r = client.get('/im/login/shibboleth?', follow=True)
67
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
68
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
69

    
70
        # shibboleth logged us in
71
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
72
                          cn="Kostas Papadimitriou",
73
                          ep_affiliation="Test Affiliation")
74
        r = client.get('/im/login/shibboleth?', follow=True)
75
        token = PendingThirdPartyUser.objects.get().token
76
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
77
        self.assertEqual(r.status_code, 200)
78

    
79
        # a new pending user created
80
        pending_user = PendingThirdPartyUser.objects.get(
81
            third_party_identifier="kpapeppn")
82
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
83
        # keep the token for future use
84
        token = pending_user.token
85
        # from now on no shibboleth headers are sent to the server
86
        client.reset_tokens()
87

    
88
        # this is the old way, it should fail, to avoid pending user take over
89
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
90
        self.assertEqual(r.status_code, 404)
91

    
92
        # this is the signup unique url associated with the pending user
93
        # created
94
        r = client.get('/im/signup/?third_party_token=%s' % token)
95
        identifier = pending_user.third_party_identifier
96
        post_data = {'third_party_identifier': identifier,
97
                     'first_name': 'Kostas',
98
                     'third_party_token': token,
99
                     'last_name': 'Mitroglou',
100
                     'provider': 'shibboleth'}
101

    
102
        signup_url = reverse('signup')
103

    
104
        # invlid email
105
        post_data['email'] = 'kpap'
106
        r = client.post(signup_url, post_data)
107
        self.assertContains(r, token)
108

    
109
        # existing email
110
        existing_user = get_local_user('test@test.com')
111
        post_data['email'] = 'test@test.com'
112
        r = client.post(signup_url, post_data)
113
        self.assertContains(r, messages.EMAIL_USED)
114
        existing_user.delete()
115

    
116
        # and finally a valid signup
117
        post_data['email'] = 'kpap@synnefo.org'
118
        r = client.post(signup_url, post_data, follow=True)
119
        self.assertContains(r, messages.VERIFICATION_SENT)
120

    
121
        # entires commited as expected
122
        self.assertEqual(AstakosUser.objects.count(), 1)
123
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
124
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
125

    
126
        # provider info stored
127
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
128
        self.assertEqual(provider.affiliation, 'Test Affiliation')
129
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
130
                                         u'eppn': u'kpapeppn',
131
                                         u'name': u'Kostas Papadimitriou'})
132

    
133
        # login (not activated yet)
134
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
135
                          cn="Kostas Papadimitriou", )
136
        r = client.get("/im/login/shibboleth?", follow=True)
137
        self.assertContains(r, 'is pending moderation')
138

    
139
        # admin activates the user
140
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
141
        backend = activation_backends.get_backend()
142
        activation_result = backend.verify_user(u, u.verification_code)
143
        activation_result = backend.accept_user(u)
144
        self.assertFalse(activation_result.is_error())
145
        backend.send_result_notifications(activation_result, u)
146
        self.assertEqual(u.is_active, True)
147

    
148
        # we see our profile
149
        r = client.get("/im/login/shibboleth?", follow=True)
150
        self.assertRedirects(r, '/im/landing')
151
        self.assertEqual(r.status_code, 200)
152

    
153
    def test_existing(self):
154
        """
155
        Test adding of third party login to an existing account
156
        """
157

    
158
        # this is our existing user
159
        existing_user = get_local_user('kpap@synnefo.org')
160
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
161
        existing_inactive.is_active = False
162
        existing_inactive.save()
163

    
164
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
165
        existing_unverified.is_active = False
166
        existing_unverified.activation_sent = None
167
        existing_unverified.email_verified = False
168
        existing_unverified.is_verified = False
169
        existing_unverified.save()
170

    
171
        client = ShibbolethClient()
172
        # shibboleth logged us in, notice that we use different email
173
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
174
                          cn="Kostas Papadimitriou", )
175
        r = client.get("/im/login/shibboleth?", follow=True)
176

    
177
        # a new pending user created
178
        pending_user = PendingThirdPartyUser.objects.get()
179
        token = pending_user.token
180
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
181
        pending_key = pending_user.token
182
        client.reset_tokens()
183
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
184

    
185
        form = r.context['login_form']
186
        signupdata = copy.copy(form.initial)
187
        signupdata['email'] = 'kpap@synnefo.org'
188
        signupdata['third_party_token'] = token
189
        signupdata['provider'] = 'shibboleth'
190
        signupdata.pop('id', None)
191

    
192
        # the email exists to another user
193
        r = client.post("/im/signup", signupdata)
194
        self.assertContains(r, "There is already an account with this email "
195
                               "address")
196
        # change the case, still cannot create
197
        signupdata['email'] = 'KPAP@synnefo.org'
198
        r = client.post("/im/signup", signupdata)
199
        self.assertContains(r, "There is already an account with this email "
200
                               "address")
201
        # inactive user
202
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
203
        r = client.post("/im/signup", signupdata)
204
        self.assertContains(r, "There is already an account with this email "
205
                               "address")
206

    
207
        # unverified user, this should pass, old entry will be deleted
208
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
209
        r = client.post("/im/signup", signupdata)
210

    
211
        post_data = {'password': 'password',
212
                     'username': 'kpap@synnefo.org'}
213
        r = client.post('/im/local', post_data, follow=True)
214
        self.assertTrue(r.context['request'].user.is_authenticated())
215
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
216
                          cn="Kostas Papadimitriou", )
217
        r = client.get("/im/login/shibboleth?", follow=True)
218
        self.assertContains(r, "enabled for this account")
219
        client.reset_tokens()
220

    
221
        user = existing_user
222
        self.assertTrue(user.has_auth_provider('shibboleth'))
223
        self.assertTrue(user.has_auth_provider('local',
224
                                               auth_backend='astakos'))
225
        client.logout()
226

    
227
        # look Ma, i can login with both my shibboleth and local account
228
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
229
                          cn="Kostas Papadimitriou")
230
        r = client.get("/im/login/shibboleth?", follow=True)
231
        self.assertTrue(r.context['request'].user.is_authenticated())
232
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
233
        self.assertRedirects(r, '/im/landing')
234
        self.assertEqual(r.status_code, 200)
235
        client.logout()
236
        client.reset_tokens()
237

    
238
        # logged out
239
        r = client.get("/im/profile", follow=True)
240
        self.assertFalse(r.context['request'].user.is_authenticated())
241

    
242
        # login with local account also works
243
        post_data = {'password': 'password',
244
                     'username': 'kpap@synnefo.org'}
245
        r = self.client.post('/im/local', post_data, follow=True)
246
        self.assertTrue(r.context['request'].user.is_authenticated())
247
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
248
        self.assertRedirects(r, '/im/landing')
249
        self.assertEqual(r.status_code, 200)
250

    
251
        # cannot add the same eppn
252
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
253
                          cn="Kostas Papadimitriou", )
254
        r = client.get("/im/login/shibboleth?", follow=True)
255
        self.assertRedirects(r, '/im/landing')
256
        self.assertTrue(r.status_code, 200)
257
        self.assertEquals(existing_user.auth_providers.count(), 2)
258

    
259
        # only one allowed by default
260
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
261
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
262
        prov = auth_providers.get_provider('shibboleth')
263
        r = client.get("/im/login/shibboleth?", follow=True)
264
        self.assertContains(r, "Failed to add")
265
        self.assertRedirects(r, '/im/profile')
266
        self.assertTrue(r.status_code, 200)
267
        self.assertEquals(existing_user.auth_providers.count(), 2)
268
        client.logout()
269
        client.reset_tokens()
270

    
271
        # cannot login with another eppn
272
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
273
                          cn="Kostas Papadimitriou")
274
        r = client.get("/im/login/shibboleth?", follow=True)
275
        self.assertFalse(r.context['request'].user.is_authenticated())
276

    
277
        # cannot
278

    
279
        # lets remove local password
280
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
281
                                       email="kpap@synnefo.org")
282
        remove_local_url = user.get_auth_provider('local').get_remove_url
283
        remove_shibbo_url = user.get_auth_provider('shibboleth',
284
                                                   'kpapeppn').get_remove_url
285
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
286
                          cn="Kostas Papadimtriou")
287
        r = client.get("/im/login/shibboleth?", follow=True)
288
        client.reset_tokens()
289

    
290
        # TODO: this view should use POST
291
        r = client.get(remove_local_url)
292
        # 2 providers left
293
        self.assertEqual(user.auth_providers.count(), 1)
294
        # cannot remove last provider
295
        r = client.get(remove_shibbo_url)
296
        self.assertEqual(r.status_code, 403)
297
        self.client.logout()
298

    
299
        # cannot login using local credentials (notice we use another client)
300
        post_data = {'password': 'password',
301
                     'username': 'kpap@synnefo.org'}
302
        r = self.client.post('/im/local', post_data, follow=True)
303
        self.assertFalse(r.context['request'].user.is_authenticated())
304

    
305
        # we can reenable the local provider by setting a password
306
        r = client.get("/im/password_change", follow=True)
307
        r = client.post("/im/password_change", {'new_password1': '111',
308
                                                'new_password2': '111'},
309
                        follow=True)
310
        user = r.context['request'].user
311
        self.assertTrue(user.has_auth_provider('local'))
312
        self.assertTrue(user.has_auth_provider('shibboleth'))
313
        self.assertTrue(user.check_password('111'))
314
        self.assertTrue(user.has_usable_password())
315
        self.client.logout()
316

    
317
        # now we can login
318
        post_data = {'password': '111',
319
                     'username': 'kpap@synnefo.org'}
320
        r = self.client.post('/im/local', post_data, follow=True)
321
        self.assertTrue(r.context['request'].user.is_authenticated())
322

    
323
        client.reset_tokens()
324

    
325
        # we cannot take over another shibboleth identifier
326
        user2 = get_local_user('another@synnefo.org')
327
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
328
        # login
329
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
330
                          cn="Kostas Papadimitriou")
331
        r = client.get("/im/login/shibboleth?", follow=True)
332
        # try to assign existing shibboleth identifier of another user
333
        client.set_tokens(mail="kpap_second@shibboleth.gr",
334
                          eppn="existingeppn", cn="Kostas Papadimitriou")
335
        r = client.get("/im/login/shibboleth?", follow=True)
336
        self.assertContains(r, "is already in use")
337

    
338

    
339
class TestLocal(TestCase):
340

    
341
    def setUp(self):
342
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
343
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
344
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
345
        settings.ASTAKOS_MODERATION_ENABLED = True
346

    
347
    def tearDown(self):
348
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
349
        AstakosUser.objects.all().delete()
350

    
351
    def test_no_moderation(self):
352
        # disable moderation
353
        astakos_settings.MODERATION_ENABLED = False
354

    
355
        # create a new user
356
        r = self.client.get("/im/signup")
357
        self.assertEqual(r.status_code, 200)
358
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
359
                'password2': 'password', 'first_name': 'Kostas',
360
                'last_name': 'Mitroglou', 'provider': 'local'}
361
        r = self.client.post("/im/signup", data)
362

    
363
        # user created
364
        self.assertEqual(AstakosUser.objects.count(), 1)
365
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
366
                                       email="kpap@synnefo.org")
367
        self.assertEqual(user.username, 'kpap@synnefo.org')
368
        self.assertEqual(user.has_auth_provider('local'), True)
369
        self.assertFalse(user.is_active)
370

    
371
        # user (but not admin) gets notified
372
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
373
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
374
        astakos_settings.MODERATION_ENABLED = True
375

    
376
    def test_email_case(self):
377
        data = {
378
            'email': 'kPap@synnefo.org',
379
            'password1': '1234',
380
            'password2': '1234'
381
        }
382

    
383
        form = forms.LocalUserCreationForm(data)
384
        self.assertTrue(form.is_valid())
385
        user = form.save()
386
        form.store_user(user, {})
387

    
388
        u = AstakosUser.objects.get()
389
        self.assertEqual(u.email, 'kPap@synnefo.org')
390
        self.assertEqual(u.username, 'kpap@synnefo.org')
391
        u.is_active = True
392
        u.email_verified = True
393
        u.save()
394

    
395
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
396
        login = forms.LoginForm(data=data)
397
        self.assertTrue(login.is_valid())
398

    
399
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
400
        login = forms.LoginForm(data=data)
401
        self.assertTrue(login.is_valid())
402

    
403
        data = {
404
            'email': 'kpap@synnefo.org',
405
            'password1': '1234',
406
            'password2': '1234'
407
        }
408
        form = forms.LocalUserCreationForm(data)
409
        self.assertFalse(form.is_valid())
410

    
411
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
412
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
413
    def test_local_provider(self):
414
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
415

    
416
        # create a user
417
        r = self.client.get("/im/signup")
418
        self.assertEqual(r.status_code, 200)
419
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
420
                'password2': 'password', 'first_name': 'Kostas',
421
                'last_name': 'Mitroglou', 'provider': 'local'}
422
        r = self.client.post("/im/signup", data)
423

    
424
        # user created
425
        self.assertEqual(AstakosUser.objects.count(), 1)
426
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
427
                                       email="kpap@synnefo.org")
428
        self.assertEqual(user.username, 'kpap@synnefo.org')
429
        self.assertEqual(user.has_auth_provider('local'), True)
430
        self.assertFalse(user.is_active)  # not activated
431
        self.assertFalse(user.email_verified)  # not verified
432
        self.assertTrue(user.activation_sent)  # activation automatically sent
433
        self.assertFalse(user.moderated)
434
        self.assertFalse(user.email_verified)
435

    
436
        # admin gets notified and activates the user from the command line
437
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
438
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
439
                                           'password': 'password'},
440
                             follow=True)
441
        self.assertContains(r, messages.VERIFICATION_SENT)
442
        backend = activation_backends.get_backend()
443

    
444
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
445
        backend.send_user_verification_email(user)
446

    
447
        # user activation fields updated and user gets notified via email
448
        user = AstakosUser.objects.get(pk=user.pk)
449
        self.assertTrue(user.activation_sent)
450
        self.assertFalse(user.email_verified)
451
        self.assertFalse(user.is_active)
452
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
453

    
454
        # user forgot she got registered and tries to submit registration
455
        # form. Notice the upper case in email
456
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
457
                'password2': 'password', 'first_name': 'Kostas',
458
                'last_name': 'Mitroglou', 'provider': 'local'}
459
        r = self.client.post("/im/signup", data, follow=True)
460
        self.assertRedirects(r, reverse('index'))
461
        self.assertContains(r, messages.VERIFICATION_SENT)
462

    
463
        user = AstakosUser.objects.get()
464
        # previous user replaced
465
        self.assertTrue(user.activation_sent)
466
        self.assertFalse(user.email_verified)
467
        self.assertFalse(user.is_active)
468
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
469

    
470
        # hmmm, email exists; lets request a password change
471
        r = self.client.get('/im/local/password_reset')
472
        self.assertEqual(r.status_code, 200)
473
        data = {'email': 'kpap@synnefo.org'}
474
        r = self.client.post('/im/local/password_reset', data, follow=True)
475
        # she can't because account is not active yet
476
        self.assertContains(r, 'pending activation')
477

    
478
        # moderation is enabled and an activation email has already been sent
479
        # so user can trigger resend of the activation email
480
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
481
        self.assertContains(r, 'has been sent to your email address.')
482
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
483

    
484
        # also she cannot login
485
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
486
        r = self.client.post('/im/local', data, follow=True)
487
        self.assertContains(r, 'Resend activation')
488
        self.assertFalse(r.context['request'].user.is_authenticated())
489
        self.assertFalse('_pithos2_a' in self.client.cookies)
490

    
491
        # user sees the message and resends activation
492
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
493
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
494

    
495
        # logged in user cannot activate another account
496
        tmp_user = get_local_user("test_existing_user@synnefo.org")
497
        tmp_client = Client()
498
        tmp_client.login(username="test_existing_user@synnefo.org",
499
                         password="password")
500
        r = tmp_client.get(user.get_activation_url(), follow=True)
501
        self.assertContains(r, messages.LOGGED_IN_WARNING)
502

    
503
        r = self.client.get(user.get_activation_url(), follow=True)
504
        # previous code got invalidated
505
        self.assertEqual(r.status_code, 404)
506

    
507
        user = AstakosUser.objects.get(pk=user.pk)
508
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
509
        r = self.client.get(user.get_activation_url(), follow=True)
510
        self.assertRedirects(r, reverse('index'))
511
        # user sees that account is pending approval from admins
512
        self.assertContains(r, messages.NOTIFICATION_SENT)
513
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
514

    
515
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
516
        result = backend.handle_moderation(user)
517
        backend.send_result_notifications(result, user)
518
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
519
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
520

    
521
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
522
        r = self.client.get('/im/profile', follow=True)
523
        self.assertFalse(r.context['request'].user.is_authenticated())
524
        self.assertFalse('_pithos2_a' in self.client.cookies)
525
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
526

    
527
        user = AstakosUser.objects.get(pk=user.pk)
528
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
529
                                           'password': 'password'},
530
                             follow=True)
531
        # user activated and logged in, token cookie set
532
        self.assertTrue(r.context['request'].user.is_authenticated())
533
        self.assertTrue('_pithos2_a' in self.client.cookies)
534
        cookies = self.client.cookies
535
        self.assertTrue(quote(user.auth_token) in
536
                        cookies.get('_pithos2_a').value)
537
        r = self.client.get('/im/logout', follow=True)
538
        r = self.client.get('/im/')
539
        # user logged out, token cookie removed
540
        self.assertFalse(r.context['request'].user.is_authenticated())
541
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
542

    
543
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
544
        del self.client.cookies['_pithos2_a']
545

    
546
        # user can login
547
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
548
                                           'password': 'password'},
549
                             follow=True)
550
        self.assertTrue(r.context['request'].user.is_authenticated())
551
        self.assertTrue('_pithos2_a' in self.client.cookies)
552
        cookies = self.client.cookies
553
        self.assertTrue(quote(user.auth_token) in
554
                        cookies.get('_pithos2_a').value)
555
        self.client.get('/im/logout', follow=True)
556

    
557
        # user forgot password
558
        old_pass = user.password
559
        r = self.client.get('/im/local/password_reset')
560
        self.assertEqual(r.status_code, 200)
561
        r = self.client.post('/im/local/password_reset', {'email':
562
                                                          'kpap@synnefo.org'})
563
        self.assertEqual(r.status_code, 302)
564
        # email sent
565
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
566

    
567
        # user visits change password link
568
        user = AstakosUser.objects.get(pk=user.pk)
569
        r = self.client.get(user.get_password_reset_url())
570
        r = self.client.post(user.get_password_reset_url(),
571
                             {'new_password1': 'newpass',
572
                              'new_password2': 'newpass'})
573

    
574
        user = AstakosUser.objects.get(pk=user.pk)
575
        self.assertNotEqual(old_pass, user.password)
576

    
577
        # old pass is not usable
578
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
579
                                           'password': 'password'})
580
        self.assertContains(r, 'Please enter a correct username and password')
581
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
582
                                           'password': 'newpass'},
583
                             follow=True)
584
        self.assertTrue(r.context['request'].user.is_authenticated())
585
        self.client.logout()
586

    
587
        # tests of special local backends
588
        user = AstakosUser.objects.get(pk=user.pk)
589
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
590
        user.save()
591

    
592
        # non astakos local backends do not support password reset
593
        r = self.client.get('/im/local/password_reset')
594
        self.assertEqual(r.status_code, 200)
595
        r = self.client.post('/im/local/password_reset', {'email':
596
                                                          'kpap@synnefo.org'})
597
        # she can't because account is not active yet
598
        self.assertContains(r, "Changing password is not")
599

    
600

    
601
class UserActionsTests(TestCase):
602

    
603
    def test_email_change(self):
604
        # to test existing email validation
605
        get_local_user('existing@synnefo.org')
606

    
607
        # local user
608
        user = get_local_user('kpap@synnefo.org')
609

    
610
        # login as kpap
611
        self.client.login(username='kpap@synnefo.org', password='password')
612
        r = self.client.get('/im/profile', follow=True)
613
        user = r.context['request'].user
614
        self.assertTrue(user.is_authenticated())
615

    
616
        # change email is enabled
617
        r = self.client.get('/im/email_change')
618
        self.assertEqual(r.status_code, 200)
619
        self.assertFalse(user.email_change_is_pending())
620

    
621
        # request email change to an existing email fails
622
        data = {'new_email_address': 'existing@synnefo.org'}
623
        r = self.client.post('/im/email_change', data)
624
        self.assertContains(r, messages.EMAIL_USED)
625

    
626
        # proper email change
627
        data = {'new_email_address': 'kpap@gmail.com'}
628
        r = self.client.post('/im/email_change', data, follow=True)
629
        self.assertRedirects(r, '/im/profile')
630
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
631
        change1 = EmailChange.objects.get()
632

    
633
        # user sees a warning
634
        r = self.client.get('/im/email_change')
635
        self.assertEqual(r.status_code, 200)
636
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
637
        self.assertTrue(user.email_change_is_pending())
638

    
639
        # link was sent
640
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
641
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
642

    
643
        # proper email change
644
        data = {'new_email_address': 'kpap@yahoo.com'}
645
        r = self.client.post('/im/email_change', data, follow=True)
646
        self.assertRedirects(r, '/im/profile')
647
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
648
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
649
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
650
        change2 = EmailChange.objects.get()
651

    
652
        r = self.client.get(change1.get_url())
653
        self.assertEquals(r.status_code, 404)
654
        self.client.logout()
655

    
656
        invalid_client = Client()
657
        r = invalid_client.post('/im/local?',
658
                                {'username': 'existing@synnefo.org',
659
                                 'password': 'password'})
660
        r = invalid_client.get(change2.get_url(), follow=True)
661
        self.assertEquals(r.status_code, 403)
662

    
663
        r = self.client.post('/im/local?next=' + change2.get_url(),
664
                             {'username': 'kpap@synnefo.org',
665
                              'password': 'password',
666
                              'next': change2.get_url()},
667
                             follow=True)
668
        self.assertRedirects(r, '/im/profile')
669
        user = r.context['request'].user
670
        self.assertEquals(user.email, 'kpap@yahoo.com')
671
        self.assertEquals(user.username, 'kpap@yahoo.com')
672

    
673
        self.client.logout()
674
        r = self.client.post('/im/local?next=' + change2.get_url(),
675
                             {'username': 'kpap@synnefo.org',
676
                              'password': 'password',
677
                              'next': change2.get_url()},
678
                             follow=True)
679
        self.assertContains(r, "Please enter a correct username and password")
680
        self.assertEqual(user.emailchanges.count(), 0)
681

    
682

    
683
class TestAuthProviderViews(TestCase):
684

    
685
    def tearDown(self):
686
        AstakosUser.objects.all().delete()
687

    
688
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
689
                         AUTOMODERATE_POLICY=True)
690
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
691
                 FORCE_PROFILE_UPDATE=False)
692
    def test_user(self):
693
        Profile = AuthProviderPolicyProfile
694
        Pending = PendingThirdPartyUser
695
        User = AstakosUser
696

    
697
        User.objects.create(email="newuser@synnefo.org")
698
        get_local_user("olduser@synnefo.org")
699
        cl_olduser = ShibbolethClient()
700
        get_local_user("olduser2@synnefo.org")
701
        ShibbolethClient()
702
        cl_newuser = ShibbolethClient()
703
        cl_newuser2 = Client()
704

    
705
        academic_group, created = Group.objects.get_or_create(
706
            name='academic-login')
707
        academic_users = academic_group.user_set
708
        assert created
709
        policy_only_academic = Profile.objects.add_policy('academic_strict',
710
                                                          'shibboleth',
711
                                                          academic_group,
712
                                                          exclusive=True,
713
                                                          login=False,
714
                                                          add=False)
715

    
716
        # new academic user
717
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
718
        cl_newuser.set_tokens(eppn="newusereppn")
719
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
720
        pending = Pending.objects.get()
721
        identifier = pending.third_party_identifier
722
        signup_data = {'third_party_identifier': identifier,
723
                       'first_name': 'Academic',
724
                       'third_party_token': pending.token,
725
                       'last_name': 'New User',
726
                       'provider': 'shibboleth'}
727
        r = cl_newuser.post('/im/signup', signup_data)
728
        self.assertContains(r, "This field is required", )
729
        signup_data['email'] = 'olduser@synnefo.org'
730
        r = cl_newuser.post('/im/signup', signup_data)
731
        self.assertContains(r, "already an account with this email", )
732
        signup_data['email'] = 'newuser@synnefo.org'
733
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
734
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
735
        self.assertEqual(r.status_code, 404)
736
        newuser = User.objects.get(email="newuser@synnefo.org")
737
        activation_link = newuser.get_activation_url()
738
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
739

    
740
        # new non-academic user
741
        signup_data = {'first_name': 'Non Academic',
742
                       'last_name': 'New User',
743
                       'provider': 'local',
744
                       'password1': 'password',
745
                       'password2': 'password'}
746
        signup_data['email'] = 'olduser@synnefo.org'
747
        r = cl_newuser2.post('/im/signup', signup_data)
748
        self.assertContains(r, 'There is already an account with this '
749
                               'email address')
750
        signup_data['email'] = 'newuser@synnefo.org'
751
        r = cl_newuser2.post('/im/signup/', signup_data)
752
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
753
        r = self.client.get(activation_link, follow=True)
754
        self.assertEqual(r.status_code, 404)
755
        newuser = User.objects.get(email="newuser@synnefo.org")
756
        self.assertTrue(newuser.activation_sent)
757

    
758
        # activation sent, user didn't open verification url so additional
759
        # registrations invalidate the previous signups.
760
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
761
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
762
        pending = Pending.objects.get()
763
        identifier = pending.third_party_identifier
764
        signup_data = {'third_party_identifier': identifier,
765
                       'first_name': 'Academic',
766
                       'third_party_token': pending.token,
767
                       'last_name': 'New User',
768
                       'provider': 'shibboleth'}
769
        signup_data['email'] = 'newuser@synnefo.org'
770
        r = cl_newuser.post('/im/signup', signup_data)
771
        self.assertEqual(r.status_code, 302)
772
        newuser = User.objects.get(email="newuser@synnefo.org")
773
        self.assertTrue(newuser.activation_sent)
774
        activation_link = newuser.get_activation_url()
775
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
776
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
777
        self.assertRedirects(r, '/im/landing')
778
        newuser = User.objects.get(email="newuser@synnefo.org")
779
        self.assertEqual(newuser.is_active, True)
780
        self.assertEqual(newuser.email_verified, True)
781
        cl_newuser.logout()
782

    
783
        # cannot reactivate if suspended
784
        newuser.is_active = False
785
        newuser.save()
786
        r = cl_newuser.get(newuser.get_activation_url())
787
        newuser = User.objects.get(email="newuser@synnefo.org")
788
        self.assertFalse(newuser.is_active)
789

    
790
        # release suspension
791
        newuser.is_active = True
792
        newuser.save()
793

    
794
        cl_newuser.get('/im/login/shibboleth?', follow=True)
795
        local = auth.get_provider('local', newuser)
796
        self.assertEqual(local.get_add_policy, False)
797
        self.assertEqual(local.get_login_policy, False)
798
        r = cl_newuser.get(local.get_add_url, follow=True)
799
        self.assertRedirects(r, '/im/profile')
800
        self.assertContains(r, 'disabled for your')
801

    
802
        cl_olduser.login(username='olduser@synnefo.org', password="password")
803
        r = cl_olduser.get('/im/profile', follow=True)
804
        self.assertEqual(r.status_code, 200)
805
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
806
        self.assertContains(r, 'Your request is missing a unique token')
807
        cl_olduser.set_tokens(eppn="newusereppn")
808
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
809
        self.assertContains(r, 'already in use')
810
        cl_olduser.set_tokens(eppn="oldusereppn")
811
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
812
        self.assertContains(r, 'Academic login enabled for this account')
813

    
814
        user = User.objects.get(email="olduser@synnefo.org")
815
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
816
        local_provider = user.get_auth_provider('local')
817
        self.assertEqual(shib_provider.get_remove_policy, True)
818
        self.assertEqual(local_provider.get_remove_policy, True)
819

    
820
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
821
                                                          'shibboleth',
822
                                                          academic_group,
823
                                                          remove=False)
824
        user.groups.add(academic_group)
825
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
826
        local_provider = user.get_auth_provider('local')
827
        self.assertEqual(shib_provider.get_remove_policy, False)
828
        self.assertEqual(local_provider.get_remove_policy, True)
829
        self.assertEqual(local_provider.get_login_policy, False)
830

    
831
        cl_olduser.logout()
832
        login_data = {'username': 'olduser@synnefo.org',
833
                      'password': 'password'}
834
        r = cl_olduser.post('/im/local', login_data, follow=True)
835
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
836
        Group.objects.all().delete()
837

    
838

    
839
class TestAuthProvidersAPI(TestCase):
840
    """
841
    Test auth_providers module API
842
    """
843

    
844
    def tearDown(self):
845
        Group.objects.all().delete()
846

    
847
    @im_settings(IM_MODULES=['local', 'shibboleth'])
848
    def test_create(self):
849
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
850
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
851

    
852
        module = 'shibboleth'
853
        identifier = 'SHIB_UUID'
854
        provider_params = {
855
            'affiliation': 'UNIVERSITY',
856
            'info': {'age': 27}
857
        }
858
        provider = auth.get_provider(module, user2, identifier,
859
                                     **provider_params)
860
        provider.add_to_user()
861
        provider = auth.get_provider(module, user, identifier,
862
                                     **provider_params)
863
        provider.add_to_user()
864
        user.email_verified = True
865
        user.save()
866
        self.assertRaises(Exception, provider.add_to_user)
867
        provider = user.get_auth_provider(module, identifier)
868
        self.assertEqual(user.get_auth_provider(
869
            module, identifier)._instance.info.get('age'), 27)
870

    
871
        module = 'local'
872
        identifier = None
873
        provider_params = {'auth_backend': 'ldap', 'info':
874
                          {'office': 'A1'}}
875
        provider = auth.get_provider(module, user, identifier,
876
                                     **provider_params)
877
        provider.add_to_user()
878
        self.assertFalse(provider.get_add_policy)
879
        self.assertRaises(Exception, provider.add_to_user)
880

    
881
        shib = user.get_auth_provider('shibboleth',
882
                                      'SHIB_UUID')
883
        self.assertTrue(shib.get_remove_policy)
884

    
885
        local = user.get_auth_provider('local')
886
        self.assertTrue(local.get_remove_policy)
887

    
888
        local.remove_from_user()
889
        self.assertFalse(shib.get_remove_policy)
890
        self.assertRaises(Exception, shib.remove_from_user)
891

    
892
        provider = user.get_auth_providers()[0]
893
        self.assertRaises(Exception, provider.add_to_user)
894

    
895
    @im_settings(IM_MODULES=['local', 'shibboleth'])
896
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
897
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
898
                                                 'group2'])
899
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
900
                        CREATION_GROUPS_POLICY=['localgroup-create',
901
                                                'group-create'])
902
    def test_add_groups(self):
903
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
904
        provider = auth.get_provider('shibboleth', user, 'test123')
905
        provider.add_to_user()
906
        user = AstakosUser.objects.get()
907
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
908
                              sorted([u'group1', u'group2', u'group-create']))
909

    
910
        local = auth.get_provider('local', user)
911
        local.add_to_user()
912
        provider = user.get_auth_provider('shibboleth')
913
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
914
        provider.remove_from_user()
915
        user = AstakosUser.objects.get()
916
        self.assertEqual(len(user.get_auth_providers()), 1)
917
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
918
                              sorted([u'group-create', u'localgroup']))
919

    
920
        local = user.get_auth_provider('local')
921
        self.assertRaises(Exception, local.remove_from_user)
922
        provider = auth.get_provider('shibboleth', user, 'test123')
923
        provider.add_to_user()
924
        user = AstakosUser.objects.get()
925
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
926
                              sorted([u'group-create', u'group1', u'group2',
927
                               u'localgroup']))
928
        Group.objects.all().delete()
929

    
930

    
931

    
932
    @im_settings(IM_MODULES=['local', 'shibboleth'])
933
    def test_policies(self):
934
        group_old, created = Group.objects.get_or_create(name='olduser')
935

    
936
        astakos_settings.MODERATION_ENABLED = True
937
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
938
            ['academic-user']
939
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
940
            ['google-user']
941

    
942
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
943
        user.groups.add(group_old)
944
        user.add_auth_provider('local')
945

    
946
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
947
        user2.add_auth_provider('shibboleth', identifier='shibid')
948

    
949
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
950
        user3.groups.add(group_old)
951
        user3.add_auth_provider('local')
952
        user3.add_auth_provider('shibboleth', identifier='1234')
953

    
954
        self.assertTrue(user2.groups.get(name='academic-user'))
955
        self.assertFalse(user2.groups.filter(name='olduser').count())
956

    
957
        local = auth_providers.get_provider('local')
958
        self.assertTrue(local.get_add_policy)
959

    
960
        academic_group = Group.objects.get(name='academic-user')
961
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
962
                                                     academic_group,
963
                                                     exclusive=True,
964
                                                     add=False,
965
                                                     login=False)
966
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
967
                                                     academic_group,
968
                                                     exclusive=True,
969
                                                     login=False,
970
                                                     add=False)
971
        # no duplicate entry gets created
972
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
973

    
974
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
975
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
976
                                                     user2,
977
                                                     remove=False)
978
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
979

    
980
        local = auth_providers.get_provider('local', user2)
981
        google = auth_providers.get_provider('google', user2)
982
        shibboleth = auth_providers.get_provider('shibboleth', user2)
983
        self.assertTrue(shibboleth.get_login_policy)
984
        self.assertFalse(shibboleth.get_remove_policy)
985
        self.assertFalse(local.get_add_policy)
986
        self.assertFalse(local.get_add_policy)
987
        self.assertFalse(google.get_add_policy)
988

    
989
        user2.groups.remove(Group.objects.get(name='academic-user'))
990
        self.assertTrue(local.get_add_policy)
991
        self.assertTrue(google.get_add_policy)
992
        user2.groups.add(Group.objects.get(name='academic-user'))
993

    
994
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
995
                                                     user2,
996
                                                     exclusive=True,
997
                                                     add=True)
998
        self.assertTrue(local.get_add_policy)
999
        self.assertTrue(google.get_add_policy)
1000

    
1001
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1002
        self.assertFalse(local.get_automoderate_policy)
1003
        self.assertFalse(google.get_automoderate_policy)
1004
        self.assertTrue(shibboleth.get_automoderate_policy)
1005

    
1006
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1007
                  'GOOGLE_ADD_GROUPS_POLICY']:
1008
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1009

    
1010

    
1011
    @shibboleth_settings(CREATE_POLICY=True)
1012
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1013
    def test_create_http(self):
1014
        # this should be wrapped inside a transaction
1015
        user = AstakosUser(email="test@test.com")
1016
        user.save()
1017
        provider = auth_providers.get_provider('shibboleth', user,
1018
                                               'test@academia.test')
1019
        provider.add_to_user()
1020
        user.get_auth_provider('shibboleth', 'test@academia.test')
1021
        provider = auth_providers.get_provider('local', user)
1022
        provider.add_to_user()
1023
        user.get_auth_provider('local')
1024

    
1025
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1026
        user = AstakosUser(email="test2@test.com")
1027
        user.save()
1028
        provider = auth_providers.get_provider('shibboleth', user,
1029
                                               'test@shibboleth.com',
1030
                                               **{'info': {'name':
1031
                                                                'User Test'}})
1032
        self.assertFalse(provider.get_create_policy)
1033
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1034
        self.assertTrue(provider.get_create_policy)
1035
        academic = provider.add_to_user()
1036

    
1037
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1038
    @shibboleth_settings(LIMIT_POLICY=2)
1039
    def test_policies(self):
1040
        user = get_local_user('kpap@synnefo.org')
1041
        user.add_auth_provider('shibboleth', identifier='1234')
1042
        user.add_auth_provider('shibboleth', identifier='12345')
1043

    
1044
        # default limit is 1
1045
        local = user.get_auth_provider('local')
1046
        self.assertEqual(local.get_add_policy, False)
1047

    
1048
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1049
        academic = user.get_auth_provider('shibboleth',
1050
                                          identifier='1234')
1051
        self.assertEqual(academic.get_add_policy, False)
1052
        newacademic = auth_providers.get_provider('shibboleth', user,
1053
                                                  identifier='123456')
1054
        self.assertEqual(newacademic.get_add_policy, True)
1055
        user.add_auth_provider('shibboleth', identifier='123456')
1056
        self.assertEqual(academic.get_add_policy, False)
1057
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1058

    
1059
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1060
    @shibboleth_settings(LIMIT_POLICY=2)
1061
    def test_messages(self):
1062
        user = get_local_user('kpap@synnefo.org')
1063
        user.add_auth_provider('shibboleth', identifier='1234')
1064
        user.add_auth_provider('shibboleth', identifier='12345')
1065
        provider = auth_providers.get_provider('shibboleth')
1066
        self.assertEqual(provider.get_message('title'), 'Academic')
1067
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1068
        # regenerate messages cache
1069
        provider = auth_providers.get_provider('shibboleth')
1070
        self.assertEqual(provider.get_message('title'), 'New title')
1071
        self.assertEqual(provider.get_message('login_title'),
1072
                         'New title LOGIN')
1073
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1074
        self.assertEqual(provider.get_module_icon,
1075
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1076
        self.assertEqual(provider.get_module_medium_icon,
1077
                         settings.MEDIA_URL +
1078
                         'im/auth/icons-medium/shibboleth.png')
1079

    
1080
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1081
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1082
        self.assertEqual(provider.get_method_details_msg,
1083
                         'Account: 12345')
1084
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1085
        self.assertEqual(provider.get_method_details_msg,
1086
                         'Account: 1234')
1087

    
1088
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1089
        self.assertEqual(provider.get_not_active_msg,
1090
                         "'Academic login' is disabled.")
1091

    
1092
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1093
    @shibboleth_settings(LIMIT_POLICY=2)
1094
    def test_templates(self):
1095
        user = get_local_user('kpap@synnefo.org')
1096
        user.add_auth_provider('shibboleth', identifier='1234')
1097
        user.add_auth_provider('shibboleth', identifier='12345')
1098

    
1099
        provider = auth_providers.get_provider('shibboleth')
1100
        self.assertEqual(provider.get_template('login'),
1101
                         'im/auth/shibboleth_login.html')
1102
        provider = auth_providers.get_provider('google')
1103
        self.assertEqual(provider.get_template('login'),
1104
                         'im/auth/generic_login.html')
1105

    
1106

    
1107
class TestActivationBackend(TestCase):
1108

    
1109
    def setUp(self):
1110
        # dummy call to pass through logging middleware
1111
        self.client.get('/im/')
1112

    
1113
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1114
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1115
    def test_policies(self):
1116
        backend = activation_backends.get_backend()
1117

    
1118
        # email matches RE_USER_EMAIL_PATTERNS
1119
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1120
                               is_active=False, email_verified=False)
1121
        backend.handle_verification(user1, user1.verification_code)
1122
        self.assertEqual(user1.accepted_policy, 'email')
1123

    
1124
        # manually moderated
1125
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1126
                               is_active=False, email_verified=False)
1127

    
1128
        backend.handle_verification(user2, user2.verification_code)
1129
        self.assertEqual(user2.moderated, False)
1130
        backend.handle_moderation(user2)
1131
        self.assertEqual(user2.moderated, True)
1132
        self.assertEqual(user2.accepted_policy, 'manual')
1133

    
1134
        # autoaccept due to provider automoderate policy
1135
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1136
                               is_active=False, email_verified=False)
1137
        user3.auth_providers.all().delete()
1138
        user3.add_auth_provider('shibboleth', identifier='shib123')
1139
        backend.handle_verification(user3, user3.verification_code)
1140
        self.assertEqual(user3.moderated, True)
1141
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1142

    
1143
    @im_settings(MODERATION_ENABLED=False,
1144
                 MANAGERS=(('Manager',
1145
                            'manager@synnefo.org'),),
1146
                 HELPDESK=(('Helpdesk',
1147
                            'helpdesk@synnefo.org'),),
1148
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1149
    def test_without_moderation(self):
1150
        backend = activation_backends.get_backend()
1151
        form = backend.get_signup_form('local')
1152
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1153

    
1154
        user_data = {
1155
            'email': 'kpap@synnefo.org',
1156
            'first_name': 'Kostas Papas',
1157
            'password1': '123',
1158
            'password2': '123'
1159
        }
1160
        form = backend.get_signup_form('local', user_data)
1161
        user = form.save(commit=False)
1162
        form.store_user(user)
1163
        self.assertEqual(user.is_active, False)
1164
        self.assertEqual(user.email_verified, False)
1165

    
1166
        # step one, registration
1167
        result = backend.handle_registration(user)
1168
        user = AstakosUser.objects.get()
1169
        self.assertEqual(user.is_active, False)
1170
        self.assertEqual(user.email_verified, False)
1171
        self.assertTrue(user.verification_code)
1172
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1173
        backend.send_result_notifications(result, user)
1174
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1175
        self.assertEqual(len(mail.outbox), 1)
1176

    
1177
        # step two, verify email (automatically
1178
        # moderates/accepts user, since moderation is disabled)
1179
        user = AstakosUser.objects.get()
1180
        valid_code = user.verification_code
1181

    
1182
        # test invalid code
1183
        result = backend.handle_verification(user, valid_code)
1184
        backend.send_result_notifications(result, user)
1185
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1186
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1187
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1188
        # verification + activated + greeting = 3
1189
        self.assertEqual(len(mail.outbox), 3)
1190
        user = AstakosUser.objects.get()
1191
        self.assertEqual(user.is_active, True)
1192
        self.assertEqual(user.moderated, True)
1193
        self.assertTrue(user.moderated_at)
1194
        self.assertEqual(user.email_verified, True)
1195
        self.assertTrue(user.activation_sent)
1196

    
1197
    @im_settings(MODERATION_ENABLED=True,
1198
                 MANAGERS=(('Manager',
1199
                            'manager@synnefo.org'),),
1200
                 HELPDESK=(('Helpdesk',
1201
                            'helpdesk@synnefo.org'),),
1202
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1203
    def test_with_moderation(self):
1204

    
1205
        backend = activation_backends.get_backend()
1206
        form = backend.get_signup_form('local')
1207
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1208

    
1209
        user_data = {
1210
            'email': 'kpap@synnefo.org',
1211
            'first_name': 'Kostas Papas',
1212
            'password1': '123',
1213
            'password2': '123'
1214
        }
1215
        form = backend.get_signup_form(provider='local',
1216
                                       initial_data=user_data)
1217
        user = form.save(commit=False)
1218
        form.store_user(user)
1219
        self.assertEqual(user.is_active, False)
1220
        self.assertEqual(user.email_verified, False)
1221

    
1222
        # step one, registration
1223
        result = backend.handle_registration(user)
1224
        user = AstakosUser.objects.get()
1225
        self.assertEqual(user.is_active, False)
1226
        self.assertEqual(user.email_verified, False)
1227
        self.assertTrue(user.verification_code)
1228
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1229
        backend.send_result_notifications(result, user)
1230
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1231
        self.assertEqual(len(mail.outbox), 1)
1232

    
1233
        # step two, verifying email
1234
        user = AstakosUser.objects.get()
1235
        valid_code = user.verification_code
1236
        invalid_code = user.verification_code + 'invalid'
1237

    
1238
        # test invalid code
1239
        result = backend.handle_verification(user, invalid_code)
1240
        self.assertEqual(result.status, backend.Result.ERROR)
1241
        backend.send_result_notifications(result, user)
1242
        user = AstakosUser.objects.get()
1243
        self.assertEqual(user.is_active, False)
1244
        self.assertEqual(user.moderated, False)
1245
        self.assertEqual(user.moderated_at, None)
1246
        self.assertEqual(user.email_verified, False)
1247
        self.assertTrue(user.activation_sent)
1248

    
1249
        # test valid code
1250
        user = AstakosUser.objects.get()
1251
        result = backend.handle_verification(user, valid_code)
1252
        backend.send_result_notifications(result, user)
1253
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1254
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1255
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1256
        self.assertEqual(len(mail.outbox), 2)
1257
        user = AstakosUser.objects.get()
1258
        self.assertEqual(user.moderated, False)
1259
        self.assertEqual(user.moderated_at, None)
1260
        self.assertEqual(user.email_verified, True)
1261
        self.assertTrue(user.activation_sent)
1262

    
1263
        # test code reuse
1264
        result = backend.handle_verification(user, valid_code)
1265
        self.assertEqual(result.status, backend.Result.ERROR)
1266
        user = AstakosUser.objects.get()
1267
        self.assertEqual(user.is_active, False)
1268
        self.assertEqual(user.moderated, False)
1269
        self.assertEqual(user.moderated_at, None)
1270
        self.assertEqual(user.email_verified, True)
1271
        self.assertTrue(user.activation_sent)
1272

    
1273
        # valid code on verified user
1274
        user = AstakosUser.objects.get()
1275
        valid_code = user.verification_code
1276
        result = backend.handle_verification(user, valid_code)
1277
        self.assertEqual(result.status, backend.Result.ERROR)
1278

    
1279
        # step three, moderation user
1280
        user = AstakosUser.objects.get()
1281
        result = backend.handle_moderation(user)
1282
        backend.send_result_notifications(result, user)
1283

    
1284
        user = AstakosUser.objects.get()
1285
        self.assertEqual(user.is_active, True)
1286
        self.assertEqual(user.moderated, True)
1287
        self.assertTrue(user.moderated_at)
1288
        self.assertEqual(user.email_verified, True)
1289
        self.assertTrue(user.activation_sent)