Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ a248ebbb

History | View | Annotate | Download (57.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from astakos.im.tests.common import *
35

    
36
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
37

    
38

    
39
class ShibbolethTests(TestCase):
40
    """
41
    Testing shibboleth authentication.
42
    """
43

    
44
    def setUp(self):
45
        self.client = ShibbolethClient()
46
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
47
        astakos_settings.MODERATION_ENABLED = True
48

    
49
    def tearDown(self):
50
        AstakosUser.objects.all().delete()
51

    
52
    @im_settings(FORCE_PROFILE_UPDATE=False)
53
    def test_create_account(self):
54

    
55
        client = ShibbolethClient()
56

    
57
        # shibboleth views validation
58
        # eepn required
59
        r = client.get(ui_url('login/shibboleth?'), follow=True)
60
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
61
            'domain': astakos_settings.BASE_HOST,
62
            'contact_email': settings.CONTACT_EMAIL
63
        })
64
        client.set_tokens(eppn="kpapeppn")
65

    
66
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
67
        # shibboleth user info required
68
        r = client.get(ui_url('login/shibboleth?'), follow=True)
69
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
70
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
71

    
72
        # shibboleth logged us in
73
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
74
                          cn="Kostas Papadimitriou",
75
                          ep_affiliation="Test Affiliation")
76
        r = client.get(ui_url('login/shibboleth?'), follow=True)
77
        token = PendingThirdPartyUser.objects.get().token
78
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
79
        self.assertEqual(r.status_code, 200)
80

    
81
        # a new pending user created
82
        pending_user = PendingThirdPartyUser.objects.get(
83
            third_party_identifier="kpapeppn")
84
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
85
        # keep the token for future use
86
        token = pending_user.token
87
        # from now on no shibboleth headers are sent to the server
88
        client.reset_tokens()
89

    
90
        # this is the old way, it should fail, to avoid pending user take over
91
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
92
        self.assertEqual(r.status_code, 404)
93

    
94
        # this is the signup unique url associated with the pending user
95
        # created
96
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
97
        identifier = pending_user.third_party_identifier
98
        post_data = {'third_party_identifier': identifier,
99
                     'first_name': 'Kostas',
100
                     'third_party_token': token,
101
                     'last_name': 'Mitroglou',
102
                     'provider': 'shibboleth'}
103

    
104
        signup_url = reverse('signup')
105

    
106
        # invlid email
107
        post_data['email'] = 'kpap'
108
        r = client.post(signup_url, post_data)
109
        self.assertContains(r, token)
110

    
111
        # existing email
112
        existing_user = get_local_user('test@test.com')
113
        post_data['email'] = 'test@test.com'
114
        r = client.post(signup_url, post_data)
115
        self.assertContains(r, messages.EMAIL_USED)
116
        existing_user.delete()
117

    
118
        # and finally a valid signup
119
        post_data['email'] = 'kpap@synnefo.org'
120
        r = client.post(signup_url, post_data, follow=True)
121
        self.assertContains(r, messages.VERIFICATION_SENT)
122

    
123
        # entires commited as expected
124
        self.assertEqual(AstakosUser.objects.count(), 1)
125
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
126
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
127

    
128
        # provider info stored
129
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
130
        self.assertEqual(provider.affiliation, 'Test Affiliation')
131
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
132
                                         u'eppn': u'kpapeppn',
133
                                         u'name': u'Kostas Papadimitriou'})
134

    
135
        # login (not activated yet)
136
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
137
                          cn="Kostas Papadimitriou", )
138
        r = client.get(ui_url("login/shibboleth?"), follow=True)
139
        self.assertContains(r, 'is pending moderation')
140

    
141
        # admin activates the user
142
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
143
        backend = activation_backends.get_backend()
144
        activation_result = backend.verify_user(u, u.verification_code)
145
        activation_result = backend.accept_user(u)
146
        self.assertFalse(activation_result.is_error())
147
        backend.send_result_notifications(activation_result, u)
148
        self.assertEqual(u.is_active, True)
149

    
150
        # we see our profile
151
        r = client.get(ui_url("login/shibboleth?"), follow=True)
152
        self.assertRedirects(r, ui_url('landing'))
153
        self.assertEqual(r.status_code, 200)
154

    
155
    def test_existing(self):
156
        """
157
        Test adding of third party login to an existing account
158
        """
159

    
160
        # this is our existing user
161
        existing_user = get_local_user('kpap@synnefo.org')
162
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
163
        existing_inactive.is_active = False
164
        existing_inactive.save()
165

    
166
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
167
        existing_unverified.is_active = False
168
        existing_unverified.activation_sent = None
169
        existing_unverified.email_verified = False
170
        existing_unverified.is_verified = False
171
        existing_unverified.save()
172

    
173
        client = ShibbolethClient()
174
        # shibboleth logged us in, notice that we use different email
175
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
176
                          cn="Kostas Papadimitriou", )
177
        r = client.get(ui_url("login/shibboleth?"), follow=True)
178

    
179
        # a new pending user created
180
        pending_user = PendingThirdPartyUser.objects.get()
181
        token = pending_user.token
182
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
183
        pending_key = pending_user.token
184
        client.reset_tokens()
185
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
186

    
187
        form = r.context['login_form']
188
        signupdata = copy.copy(form.initial)
189
        signupdata['email'] = 'kpap@synnefo.org'
190
        signupdata['third_party_token'] = token
191
        signupdata['provider'] = 'shibboleth'
192
        signupdata.pop('id', None)
193

    
194
        # the email exists to another user
195
        r = client.post(ui_url("signup"), signupdata)
196
        self.assertContains(r, "There is already an account with this email "
197
                               "address")
198
        # change the case, still cannot create
199
        signupdata['email'] = 'KPAP@synnefo.org'
200
        r = client.post(ui_url("signup"), signupdata)
201
        self.assertContains(r, "There is already an account with this email "
202
                               "address")
203
        # inactive user
204
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
205
        r = client.post(ui_url("signup"), signupdata)
206
        self.assertContains(r, "There is already an account with this email "
207
                               "address")
208

    
209
        # unverified user, this should pass, old entry will be deleted
210
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
211
        r = client.post(ui_url("signup"), signupdata)
212

    
213
        post_data = {'password': 'password',
214
                     'username': 'kpap@synnefo.org'}
215
        r = client.post(ui_url('local'), post_data, follow=True)
216
        self.assertTrue(r.context['request'].user.is_authenticated())
217
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
218
                          cn="Kostas Papadimitriou", )
219
        r = client.get(ui_url("login/shibboleth?"), follow=True)
220
        self.assertContains(r, "enabled for this account")
221
        client.reset_tokens()
222

    
223
        user = existing_user
224
        self.assertTrue(user.has_auth_provider('shibboleth'))
225
        self.assertTrue(user.has_auth_provider('local',
226
                                               auth_backend='astakos'))
227
        client.logout()
228

    
229
        # look Ma, i can login with both my shibboleth and local account
230
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
231
                          cn="Kostas Papadimitriou")
232
        r = client.get(ui_url("login/shibboleth?"), follow=True)
233
        self.assertTrue(r.context['request'].user.is_authenticated())
234
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
235
        self.assertRedirects(r, ui_url('landing'))
236
        self.assertEqual(r.status_code, 200)
237
        client.logout()
238
        client.reset_tokens()
239

    
240
        # logged out
241
        r = client.get(ui_url("profile"), follow=True)
242
        self.assertFalse(r.context['request'].user.is_authenticated())
243

    
244
        # login with local account also works
245
        post_data = {'password': 'password',
246
                     'username': 'kpap@synnefo.org'}
247
        r = self.client.post(ui_url('local'), post_data, follow=True)
248
        self.assertTrue(r.context['request'].user.is_authenticated())
249
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
250
        self.assertRedirects(r, ui_url('landing'))
251
        self.assertEqual(r.status_code, 200)
252

    
253
        # cannot add the same eppn
254
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
255
                          cn="Kostas Papadimitriou", )
256
        r = client.get(ui_url("login/shibboleth?"), follow=True)
257
        self.assertRedirects(r, ui_url('landing'))
258
        self.assertTrue(r.status_code, 200)
259
        self.assertEquals(existing_user.auth_providers.count(), 2)
260

    
261
        # only one allowed by default
262
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
263
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
264
        prov = auth_providers.get_provider('shibboleth')
265
        r = client.get(ui_url("login/shibboleth?"), follow=True)
266
        self.assertContains(r, "Failed to add")
267
        self.assertRedirects(r, ui_url('profile'))
268
        self.assertTrue(r.status_code, 200)
269
        self.assertEquals(existing_user.auth_providers.count(), 2)
270
        client.logout()
271
        client.reset_tokens()
272

    
273
        # cannot login with another eppn
274
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
275
                          cn="Kostas Papadimitriou")
276
        r = client.get(ui_url("login/shibboleth?"), follow=True)
277
        self.assertFalse(r.context['request'].user.is_authenticated())
278

    
279
        # cannot
280

    
281
        # lets remove local password
282
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
283
                                       email="kpap@synnefo.org")
284
        remove_local_url = user.get_auth_provider('local').get_remove_url
285
        remove_shibbo_url = user.get_auth_provider('shibboleth',
286
                                                   'kpapeppn').get_remove_url
287
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
288
                          cn="Kostas Papadimtriou")
289
        r = client.get(ui_url("login/shibboleth?"), follow=True)
290
        client.reset_tokens()
291

    
292
        # only POST is allowed (for CSRF protection)
293
        r = client.get(remove_local_url, follow=True)
294
        self.assertEqual(r.status_code, 405)
295

    
296
        r = client.post(remove_local_url, follow=True)
297
        # 2 providers left
298
        self.assertEqual(user.auth_providers.count(), 1)
299
        # cannot remove last provider
300
        r = client.post(remove_shibbo_url)
301
        self.assertEqual(r.status_code, 403)
302
        self.client.logout()
303

    
304
        # cannot login using local credentials (notice we use another client)
305
        post_data = {'password': 'password',
306
                     'username': 'kpap@synnefo.org'}
307
        r = self.client.post(ui_url('local'), post_data, follow=True)
308
        self.assertFalse(r.context['request'].user.is_authenticated())
309

    
310
        # we can reenable the local provider by setting a password
311
        r = client.get(ui_url("password_change"), follow=True)
312
        r = client.post(ui_url("password_change"), {'new_password1': '111',
313
                                                'new_password2': '111'},
314
                        follow=True)
315
        user = r.context['request'].user
316
        self.assertTrue(user.has_auth_provider('local'))
317
        self.assertTrue(user.has_auth_provider('shibboleth'))
318
        self.assertTrue(user.check_password('111'))
319
        self.assertTrue(user.has_usable_password())
320
        self.client.logout()
321

    
322
        # now we can login
323
        post_data = {'password': '111',
324
                     'username': 'kpap@synnefo.org'}
325
        r = self.client.post(ui_url('local'), post_data, follow=True)
326
        self.assertTrue(r.context['request'].user.is_authenticated())
327

    
328
        client.reset_tokens()
329

    
330
        # we cannot take over another shibboleth identifier
331
        user2 = get_local_user('another@synnefo.org')
332
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
333
        # login
334
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
335
                          cn="Kostas Papadimitriou")
336
        r = client.get(ui_url("login/shibboleth?"), follow=True)
337
        # try to assign existing shibboleth identifier of another user
338
        client.set_tokens(mail="kpap_second@shibboleth.gr",
339
                          eppn="existingeppn", cn="Kostas Papadimitriou")
340
        r = client.get(ui_url("login/shibboleth?"), follow=True)
341
        self.assertContains(r, "is already in use")
342

    
343

    
344
class TestLocal(TestCase):
345

    
346
    def setUp(self):
347
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
348
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
349
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
350
        settings.ASTAKOS_MODERATION_ENABLED = True
351

    
352
    def tearDown(self):
353
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
354
        AstakosUser.objects.all().delete()
355

    
356
    def test_no_moderation(self):
357
        # disable moderation
358
        astakos_settings.MODERATION_ENABLED = False
359

    
360
        # create a new user
361
        r = self.client.get(ui_url("signup"))
362
        self.assertEqual(r.status_code, 200)
363
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
364
                'password2': 'password', 'first_name': 'Kostas',
365
                'last_name': 'Mitroglou', 'provider': 'local'}
366
        r = self.client.post(ui_url("signup"), data)
367

    
368
        # user created
369
        self.assertEqual(AstakosUser.objects.count(), 1)
370
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
371
                                       email="kpap@synnefo.org")
372
        self.assertEqual(user.username, 'kpap@synnefo.org')
373
        self.assertEqual(user.has_auth_provider('local'), True)
374
        self.assertFalse(user.is_active)
375

    
376
        # user (but not admin) gets notified
377
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
378
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
379
        astakos_settings.MODERATION_ENABLED = True
380

    
381
    def test_email_case(self):
382
        data = {
383
            'email': 'kPap@synnefo.org',
384
            'password1': '1234',
385
            'password2': '1234'
386
        }
387

    
388
        form = forms.LocalUserCreationForm(data)
389
        self.assertTrue(form.is_valid())
390
        user = form.save()
391
        form.store_user(user, {})
392

    
393
        u = AstakosUser.objects.get()
394
        self.assertEqual(u.email, 'kPap@synnefo.org')
395
        self.assertEqual(u.username, 'kpap@synnefo.org')
396
        u.is_active = True
397
        u.email_verified = True
398
        u.save()
399

    
400
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
401
        login = forms.LoginForm(data=data)
402
        self.assertTrue(login.is_valid())
403

    
404
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
405
        login = forms.LoginForm(data=data)
406
        self.assertTrue(login.is_valid())
407

    
408
        data = {
409
            'email': 'kpap@synnefo.org',
410
            'password1': '1234',
411
            'password2': '1234'
412
        }
413
        form = forms.LocalUserCreationForm(data)
414
        self.assertFalse(form.is_valid())
415

    
416
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
417
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
418
    def test_local_provider(self):
419
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
420

    
421
        # create a user
422
        r = self.client.get(ui_url("signup"))
423
        self.assertEqual(r.status_code, 200)
424
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
425
                'password2': 'password', 'first_name': 'Kostas',
426
                'last_name': 'Mitroglou', 'provider': 'local'}
427
        r = self.client.post(ui_url("signup"), data)
428

    
429
        # user created
430
        self.assertEqual(AstakosUser.objects.count(), 1)
431
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
432
                                       email="kpap@synnefo.org")
433
        self.assertEqual(user.username, 'kpap@synnefo.org')
434
        self.assertEqual(user.has_auth_provider('local'), True)
435
        self.assertFalse(user.is_active)  # not activated
436
        self.assertFalse(user.email_verified)  # not verified
437
        self.assertTrue(user.activation_sent)  # activation automatically sent
438
        self.assertFalse(user.moderated)
439
        self.assertFalse(user.email_verified)
440

    
441
        # admin gets notified and activates the user from the command line
442
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
443
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
444
                                           'password': 'password'},
445
                             follow=True)
446
        self.assertContains(r, messages.VERIFICATION_SENT)
447
        backend = activation_backends.get_backend()
448

    
449
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
450
        backend.send_user_verification_email(user)
451

    
452
        # user activation fields updated and user gets notified via email
453
        user = AstakosUser.objects.get(pk=user.pk)
454
        self.assertTrue(user.activation_sent)
455
        self.assertFalse(user.email_verified)
456
        self.assertFalse(user.is_active)
457
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
458

    
459
        # user forgot she got registered and tries to submit registration
460
        # form. Notice the upper case in email
461
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
462
                'password2': 'password', 'first_name': 'Kostas',
463
                'last_name': 'Mitroglou', 'provider': 'local'}
464
        r = self.client.post(ui_url("signup"), data, follow=True)
465
        self.assertRedirects(r, reverse('index'))
466
        self.assertContains(r, messages.VERIFICATION_SENT)
467

    
468
        user = AstakosUser.objects.get()
469
        # previous user replaced
470
        self.assertTrue(user.activation_sent)
471
        self.assertFalse(user.email_verified)
472
        self.assertFalse(user.is_active)
473
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
474

    
475
        # hmmm, email exists; lets request a password change
476
        r = self.client.get(ui_url('local/password_reset'))
477
        self.assertEqual(r.status_code, 200)
478
        data = {'email': 'kpap@synnefo.org'}
479
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
480
        # she can't because account is not active yet
481
        self.assertContains(r, 'pending activation')
482

    
483
        # moderation is enabled and an activation email has already been sent
484
        # so user can trigger resend of the activation email
485
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
486
                            follow=True)
487
        self.assertContains(r, 'has been sent to your email address.')
488
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
489

    
490
        # also she cannot login
491
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
492
        r = self.client.post(ui_url('local'), data, follow=True)
493
        self.assertContains(r, 'Resend activation')
494
        self.assertFalse(r.context['request'].user.is_authenticated())
495
        self.assertFalse('_pithos2_a' in self.client.cookies)
496

    
497
        # user sees the message and resends activation
498
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
499
                            follow=True)
500
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
501

    
502
        # logged in user cannot activate another account
503
        tmp_user = get_local_user("test_existing_user@synnefo.org")
504
        tmp_client = Client()
505
        tmp_client.login(username="test_existing_user@synnefo.org",
506
                         password="password")
507
        r = tmp_client.get(user.get_activation_url(), follow=True)
508
        self.assertContains(r, messages.LOGGED_IN_WARNING)
509

    
510
        r = self.client.get(user.get_activation_url(), follow=True)
511
        # previous code got invalidated
512
        self.assertEqual(r.status_code, 404)
513

    
514
        user = AstakosUser.objects.get(pk=user.pk)
515
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
516
        r = self.client.get(user.get_activation_url(), follow=True)
517
        self.assertRedirects(r, reverse('index'))
518
        # user sees that account is pending approval from admins
519
        self.assertContains(r, messages.NOTIFICATION_SENT)
520
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
521

    
522
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
523
        result = backend.handle_moderation(user)
524
        backend.send_result_notifications(result, user)
525
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
526
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
527

    
528
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
529
        r = self.client.get(ui_url('profile'), follow=True)
530
        self.assertFalse(r.context['request'].user.is_authenticated())
531
        self.assertFalse('_pithos2_a' in self.client.cookies)
532
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
533

    
534
        user = AstakosUser.objects.get(pk=user.pk)
535
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
536
                                               'password': 'password'},
537
                             follow=True)
538
        # user activated and logged in, token cookie set
539
        self.assertTrue(r.context['request'].user.is_authenticated())
540
        self.assertTrue('_pithos2_a' in self.client.cookies)
541
        cookies = self.client.cookies
542
        self.assertTrue(quote(user.auth_token) in
543
                        cookies.get('_pithos2_a').value)
544
        r = self.client.get(ui_url('logout'), follow=True)
545
        r = self.client.get(ui_url(''))
546
        # user logged out, token cookie removed
547
        self.assertFalse(r.context['request'].user.is_authenticated())
548
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
549

    
550
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
551
        del self.client.cookies['_pithos2_a']
552

    
553
        # user can login
554
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
555
                                               'password': 'password'},
556
                             follow=True)
557
        self.assertTrue(r.context['request'].user.is_authenticated())
558
        self.assertTrue('_pithos2_a' in self.client.cookies)
559
        cookies = self.client.cookies
560
        self.assertTrue(quote(user.auth_token) in
561
                        cookies.get('_pithos2_a').value)
562
        self.client.get(ui_url('logout'), follow=True)
563

    
564
        # user forgot password
565
        old_pass = user.password
566
        r = self.client.get(ui_url('local/password_reset'))
567
        self.assertEqual(r.status_code, 200)
568
        r = self.client.post(ui_url('local/password_reset'),
569
                             {'email': 'kpap@synnefo.org'})
570
        self.assertEqual(r.status_code, 302)
571
        # email sent
572
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
573

    
574
        # user visits change password link
575
        user = AstakosUser.objects.get(pk=user.pk)
576
        r = self.client.get(user.get_password_reset_url())
577
        r = self.client.post(user.get_password_reset_url(),
578
                             {'new_password1': 'newpass',
579
                              'new_password2': 'newpass'})
580

    
581
        user = AstakosUser.objects.get(pk=user.pk)
582
        self.assertNotEqual(old_pass, user.password)
583

    
584
        # old pass is not usable
585
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
586
                                               'password': 'password'})
587
        self.assertContains(r, 'Please enter a correct username and password')
588
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
589
                                               'password': 'newpass'},
590
                             follow=True)
591
        self.assertTrue(r.context['request'].user.is_authenticated())
592
        self.client.logout()
593

    
594
        # tests of special local backends
595
        user = AstakosUser.objects.get(pk=user.pk)
596
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
597
        user.save()
598

    
599
        # non astakos local backends do not support password reset
600
        r = self.client.get(ui_url('local/password_reset'))
601
        self.assertEqual(r.status_code, 200)
602
        r = self.client.post(ui_url('local/password_reset'),
603
                             {'email': 'kpap@synnefo.org'})
604
        # she can't because account is not active yet
605
        self.assertContains(r, "Changing password is not")
606

    
607

    
608
class UserActionsTests(TestCase):
609

    
610
    def test_email_change(self):
611
        # to test existing email validation
612
        get_local_user('existing@synnefo.org')
613

    
614
        # local user
615
        user = get_local_user('kpap@synnefo.org')
616

    
617
        # login as kpap
618
        self.client.login(username='kpap@synnefo.org', password='password')
619
        r = self.client.get(ui_url('profile'), follow=True)
620
        user = r.context['request'].user
621
        self.assertTrue(user.is_authenticated())
622

    
623
        # change email is enabled
624
        r = self.client.get(ui_url('email_change'))
625
        self.assertEqual(r.status_code, 200)
626
        self.assertFalse(user.email_change_is_pending())
627

    
628
        # request email change to an existing email fails
629
        data = {'new_email_address': 'existing@synnefo.org'}
630
        r = self.client.post(ui_url('email_change'), data)
631
        self.assertContains(r, messages.EMAIL_USED)
632

    
633
        # proper email change
634
        data = {'new_email_address': 'kpap@gmail.com'}
635
        r = self.client.post(ui_url('email_change'), data, follow=True)
636
        self.assertRedirects(r, ui_url('profile'))
637
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
638
        change1 = EmailChange.objects.get()
639

    
640
        # user sees a warning
641
        r = self.client.get(ui_url('email_change'))
642
        self.assertEqual(r.status_code, 200)
643
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
644
        self.assertTrue(user.email_change_is_pending())
645

    
646
        # link was sent
647
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
648
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
649

    
650
        # proper email change
651
        data = {'new_email_address': 'kpap@yahoo.com'}
652
        r = self.client.post(ui_url('email_change'), data, follow=True)
653
        self.assertRedirects(r, ui_url('profile'))
654
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
655
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
656
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
657
        change2 = EmailChange.objects.get()
658

    
659
        r = self.client.get(change1.get_url())
660
        self.assertEquals(r.status_code, 404)
661
        self.client.logout()
662

    
663
        invalid_client = Client()
664
        r = invalid_client.post(ui_url('local?'),
665
                                {'username': 'existing@synnefo.org',
666
                                 'password': 'password'})
667
        r = invalid_client.get(change2.get_url(), follow=True)
668
        self.assertEquals(r.status_code, 403)
669

    
670
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
671
                             {'username': 'kpap@synnefo.org',
672
                              'password': 'password',
673
                              'next': change2.get_url()},
674
                             follow=True)
675
        self.assertRedirects(r, ui_url('profile'))
676
        user = r.context['request'].user
677
        self.assertEquals(user.email, 'kpap@yahoo.com')
678
        self.assertEquals(user.username, 'kpap@yahoo.com')
679

    
680
        self.client.logout()
681
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
682
                             {'username': 'kpap@synnefo.org',
683
                              'password': 'password',
684
                              'next': change2.get_url()},
685
                             follow=True)
686
        self.assertContains(r, "Please enter a correct username and password")
687
        self.assertEqual(user.emailchanges.count(), 0)
688

    
689
        AstakosUser.objects.all().delete()
690
        Group.objects.all().delete()
691

    
692

    
693
class TestAuthProviderViews(TestCase):
694

    
695
    def tearDown(self):
696
        AstakosUser.objects.all().delete()
697

    
698
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
699
                         AUTOMODERATE_POLICY=True)
700
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
701
                 FORCE_PROFILE_UPDATE=False)
702
    def test_user(self):
703
        Profile = AuthProviderPolicyProfile
704
        Pending = PendingThirdPartyUser
705
        User = AstakosUser
706

    
707
        User.objects.create(email="newuser@synnefo.org")
708
        get_local_user("olduser@synnefo.org")
709
        cl_olduser = ShibbolethClient()
710
        get_local_user("olduser2@synnefo.org")
711
        ShibbolethClient()
712
        cl_newuser = ShibbolethClient()
713
        cl_newuser2 = Client()
714

    
715
        academic_group, created = Group.objects.get_or_create(
716
            name='academic-login')
717
        academic_users = academic_group.user_set
718
        assert created
719
        policy_only_academic = Profile.objects.add_policy('academic_strict',
720
                                                          'shibboleth',
721
                                                          academic_group,
722
                                                          exclusive=True,
723
                                                          login=False,
724
                                                          add=False)
725

    
726
        # new academic user
727
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
728
        cl_newuser.set_tokens(eppn="newusereppn")
729
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
730
        pending = Pending.objects.get()
731
        identifier = pending.third_party_identifier
732
        signup_data = {'third_party_identifier': identifier,
733
                       'first_name': 'Academic',
734
                       'third_party_token': pending.token,
735
                       'last_name': 'New User',
736
                       'provider': 'shibboleth'}
737
        r = cl_newuser.post(ui_url('signup'), signup_data)
738
        self.assertContains(r, "This field is required", )
739
        signup_data['email'] = 'olduser@synnefo.org'
740
        r = cl_newuser.post(ui_url('signup'), signup_data)
741
        self.assertContains(r, "already an account with this email", )
742
        signup_data['email'] = 'newuser@synnefo.org'
743
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
744
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
745
        self.assertEqual(r.status_code, 404)
746
        newuser = User.objects.get(email="newuser@synnefo.org")
747
        activation_link = newuser.get_activation_url()
748
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
749

    
750
        # new non-academic user
751
        signup_data = {'first_name': 'Non Academic',
752
                       'last_name': 'New User',
753
                       'provider': 'local',
754
                       'password1': 'password',
755
                       'password2': 'password'}
756
        signup_data['email'] = 'olduser@synnefo.org'
757
        r = cl_newuser2.post(ui_url('signup'), signup_data)
758
        self.assertContains(r, 'There is already an account with this '
759
                               'email address')
760
        signup_data['email'] = 'newuser@synnefo.org'
761
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
762
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
763
        r = self.client.get(activation_link, follow=True)
764
        self.assertEqual(r.status_code, 404)
765
        newuser = User.objects.get(email="newuser@synnefo.org")
766
        self.assertTrue(newuser.activation_sent)
767

    
768
        # activation sent, user didn't open verification url so additional
769
        # registrations invalidate the previous signups.
770
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
771
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
772
        pending = Pending.objects.get()
773
        identifier = pending.third_party_identifier
774
        signup_data = {'third_party_identifier': identifier,
775
                       'first_name': 'Academic',
776
                       'third_party_token': pending.token,
777
                       'last_name': 'New User',
778
                       'provider': 'shibboleth'}
779
        signup_data['email'] = 'newuser@synnefo.org'
780
        r = cl_newuser.post(ui_url('signup'), signup_data)
781
        self.assertEqual(r.status_code, 302)
782
        newuser = User.objects.get(email="newuser@synnefo.org")
783
        self.assertTrue(newuser.activation_sent)
784
        activation_link = newuser.get_activation_url()
785
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
786
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
787
        self.assertRedirects(r, ui_url('landing'))
788
        newuser = User.objects.get(email="newuser@synnefo.org")
789
        self.assertEqual(newuser.is_active, True)
790
        self.assertEqual(newuser.email_verified, True)
791
        cl_newuser.logout()
792

    
793
        # cannot reactivate if suspended
794
        newuser.is_active = False
795
        newuser.save()
796
        r = cl_newuser.get(newuser.get_activation_url())
797
        newuser = User.objects.get(email="newuser@synnefo.org")
798
        self.assertFalse(newuser.is_active)
799

    
800
        # release suspension
801
        newuser.is_active = True
802
        newuser.save()
803

    
804
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
805
        local = auth.get_provider('local', newuser)
806
        self.assertEqual(local.get_add_policy, False)
807
        self.assertEqual(local.get_login_policy, False)
808
        r = cl_newuser.get(local.get_add_url, follow=True)
809
        self.assertRedirects(r, ui_url('profile'))
810
        self.assertContains(r, 'disabled for your')
811

    
812
        cl_olduser.login(username='olduser@synnefo.org', password="password")
813
        r = cl_olduser.get(ui_url('profile'), follow=True)
814
        self.assertEqual(r.status_code, 200)
815
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
816
        self.assertContains(r, 'Your request is missing a unique token')
817
        cl_olduser.set_tokens(eppn="newusereppn")
818
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
819
        self.assertContains(r, 'already in use')
820
        cl_olduser.set_tokens(eppn="oldusereppn")
821
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
822
        self.assertContains(r, 'Academic login enabled for this account')
823

    
824
        user = User.objects.get(email="olduser@synnefo.org")
825
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
826
        local_provider = user.get_auth_provider('local')
827
        self.assertEqual(shib_provider.get_remove_policy, True)
828
        self.assertEqual(local_provider.get_remove_policy, True)
829

    
830
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
831
                                                          'shibboleth',
832
                                                          academic_group,
833
                                                          remove=False)
834
        user.groups.add(academic_group)
835
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
836
        local_provider = user.get_auth_provider('local')
837
        self.assertEqual(shib_provider.get_remove_policy, False)
838
        self.assertEqual(local_provider.get_remove_policy, True)
839
        self.assertEqual(local_provider.get_login_policy, False)
840

    
841
        cl_olduser.logout()
842
        login_data = {'username': 'olduser@synnefo.org',
843
                      'password': 'password'}
844
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
845
        self.assertContains(r, "login/shibboleth'>Academic login")
846
        Group.objects.all().delete()
847

    
848

    
849
class TestAuthProvidersAPI(TestCase):
850
    """
851
    Test auth_providers module API
852
    """
853

    
854
    def tearDown(self):
855
        Group.objects.all().delete()
856

    
857
    @im_settings(IM_MODULES=['local', 'shibboleth'])
858
    def test_create(self):
859
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
860
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
861

    
862
        module = 'shibboleth'
863
        identifier = 'SHIB_UUID'
864
        provider_params = {
865
            'affiliation': 'UNIVERSITY',
866
            'info': {'age': 27}
867
        }
868
        provider = auth.get_provider(module, user2, identifier,
869
                                     **provider_params)
870
        provider.add_to_user()
871
        provider = auth.get_provider(module, user, identifier,
872
                                     **provider_params)
873
        provider.add_to_user()
874
        user.email_verified = True
875
        user.save()
876
        self.assertRaises(Exception, provider.add_to_user)
877
        provider = user.get_auth_provider(module, identifier)
878
        self.assertEqual(user.get_auth_provider(
879
            module, identifier)._instance.info.get('age'), 27)
880

    
881
        module = 'local'
882
        identifier = None
883
        provider_params = {'auth_backend': 'ldap', 'info':
884
                          {'office': 'A1'}}
885
        provider = auth.get_provider(module, user, identifier,
886
                                     **provider_params)
887
        provider.add_to_user()
888
        self.assertFalse(provider.get_add_policy)
889
        self.assertRaises(Exception, provider.add_to_user)
890

    
891
        shib = user.get_auth_provider('shibboleth',
892
                                      'SHIB_UUID')
893
        self.assertTrue(shib.get_remove_policy)
894

    
895
        local = user.get_auth_provider('local')
896
        self.assertTrue(local.get_remove_policy)
897

    
898
        local.remove_from_user()
899
        self.assertFalse(shib.get_remove_policy)
900
        self.assertRaises(Exception, shib.remove_from_user)
901

    
902
        provider = user.get_auth_providers()[0]
903
        self.assertRaises(Exception, provider.add_to_user)
904

    
905
    @im_settings(IM_MODULES=['local', 'shibboleth'])
906
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
907
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
908
                                                 'group2'])
909
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
910
                        CREATION_GROUPS_POLICY=['localgroup-create',
911
                                                'group-create'])
912
    def test_add_groups(self):
913
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
914
        provider = auth.get_provider('shibboleth', user, 'test123')
915
        provider.add_to_user()
916
        user = AstakosUser.objects.get()
917
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
918
                              sorted([u'group1', u'group2', u'group-create']))
919

    
920
        local = auth.get_provider('local', user)
921
        local.add_to_user()
922
        provider = user.get_auth_provider('shibboleth')
923
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
924
        provider.remove_from_user()
925
        user = AstakosUser.objects.get()
926
        self.assertEqual(len(user.get_auth_providers()), 1)
927
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
928
                              sorted([u'group-create', u'localgroup']))
929

    
930
        local = user.get_auth_provider('local')
931
        self.assertRaises(Exception, local.remove_from_user)
932
        provider = auth.get_provider('shibboleth', user, 'test123')
933
        provider.add_to_user()
934
        user = AstakosUser.objects.get()
935
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
936
                              sorted([u'group-create', u'group1', u'group2',
937
                               u'localgroup']))
938
        Group.objects.all().delete()
939

    
940

    
941

    
942
    @im_settings(IM_MODULES=['local', 'shibboleth'])
943
    def test_policies(self):
944
        group_old, created = Group.objects.get_or_create(name='olduser')
945

    
946
        astakos_settings.MODERATION_ENABLED = True
947
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
948
            ['academic-user']
949
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
950
            ['google-user']
951

    
952
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
953
        user.groups.add(group_old)
954
        user.add_auth_provider('local')
955

    
956
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
957
        user2.add_auth_provider('shibboleth', identifier='shibid')
958

    
959
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
960
        user3.groups.add(group_old)
961
        user3.add_auth_provider('local')
962
        user3.add_auth_provider('shibboleth', identifier='1234')
963

    
964
        self.assertTrue(user2.groups.get(name='academic-user'))
965
        self.assertFalse(user2.groups.filter(name='olduser').count())
966

    
967
        local = auth_providers.get_provider('local')
968
        self.assertTrue(local.get_add_policy)
969

    
970
        academic_group = Group.objects.get(name='academic-user')
971
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
972
                                                     academic_group,
973
                                                     exclusive=True,
974
                                                     add=False,
975
                                                     login=False)
976
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
977
                                                     academic_group,
978
                                                     exclusive=True,
979
                                                     login=False,
980
                                                     add=False)
981
        # no duplicate entry gets created
982
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
983

    
984
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
985
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
986
                                                     user2,
987
                                                     remove=False)
988
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
989

    
990
        local = auth_providers.get_provider('local', user2)
991
        google = auth_providers.get_provider('google', user2)
992
        shibboleth = auth_providers.get_provider('shibboleth', user2)
993
        self.assertTrue(shibboleth.get_login_policy)
994
        self.assertFalse(shibboleth.get_remove_policy)
995
        self.assertFalse(local.get_add_policy)
996
        self.assertFalse(local.get_add_policy)
997
        self.assertFalse(google.get_add_policy)
998

    
999
        user2.groups.remove(Group.objects.get(name='academic-user'))
1000
        self.assertTrue(local.get_add_policy)
1001
        self.assertTrue(google.get_add_policy)
1002
        user2.groups.add(Group.objects.get(name='academic-user'))
1003

    
1004
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1005
                                                     user2,
1006
                                                     exclusive=True,
1007
                                                     add=True)
1008
        self.assertTrue(local.get_add_policy)
1009
        self.assertTrue(google.get_add_policy)
1010

    
1011
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1012
        self.assertFalse(local.get_automoderate_policy)
1013
        self.assertFalse(google.get_automoderate_policy)
1014
        self.assertTrue(shibboleth.get_automoderate_policy)
1015

    
1016
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1017
                  'GOOGLE_ADD_GROUPS_POLICY']:
1018
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1019

    
1020

    
1021
    @shibboleth_settings(CREATE_POLICY=True)
1022
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1023
    def test_create_http(self):
1024
        # this should be wrapped inside a transaction
1025
        user = AstakosUser(email="test@test.com")
1026
        user.save()
1027
        provider = auth_providers.get_provider('shibboleth', user,
1028
                                               'test@academia.test')
1029
        provider.add_to_user()
1030
        user.get_auth_provider('shibboleth', 'test@academia.test')
1031
        provider = auth_providers.get_provider('local', user)
1032
        provider.add_to_user()
1033
        user.get_auth_provider('local')
1034

    
1035
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1036
        user = AstakosUser(email="test2@test.com")
1037
        user.save()
1038
        provider = auth_providers.get_provider('shibboleth', user,
1039
                                               'test@shibboleth.com',
1040
                                               **{'info': {'name':
1041
                                                                'User Test'}})
1042
        self.assertFalse(provider.get_create_policy)
1043
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1044
        self.assertTrue(provider.get_create_policy)
1045
        academic = provider.add_to_user()
1046

    
1047
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1048
    @shibboleth_settings(LIMIT_POLICY=2)
1049
    def test_policies(self):
1050
        user = get_local_user('kpap@synnefo.org')
1051
        user.add_auth_provider('shibboleth', identifier='1234')
1052
        user.add_auth_provider('shibboleth', identifier='12345')
1053

    
1054
        # default limit is 1
1055
        local = user.get_auth_provider('local')
1056
        self.assertEqual(local.get_add_policy, False)
1057

    
1058
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1059
        academic = user.get_auth_provider('shibboleth',
1060
                                          identifier='1234')
1061
        self.assertEqual(academic.get_add_policy, False)
1062
        newacademic = auth_providers.get_provider('shibboleth', user,
1063
                                                  identifier='123456')
1064
        self.assertEqual(newacademic.get_add_policy, True)
1065
        user.add_auth_provider('shibboleth', identifier='123456')
1066
        self.assertEqual(academic.get_add_policy, False)
1067
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1068

    
1069
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1070
    @shibboleth_settings(LIMIT_POLICY=2)
1071
    def test_messages(self):
1072
        user = get_local_user('kpap@synnefo.org')
1073
        user.add_auth_provider('shibboleth', identifier='1234')
1074
        user.add_auth_provider('shibboleth', identifier='12345')
1075
        provider = auth_providers.get_provider('shibboleth')
1076
        self.assertEqual(provider.get_message('title'), 'Academic')
1077
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1078
        # regenerate messages cache
1079
        provider = auth_providers.get_provider('shibboleth')
1080
        self.assertEqual(provider.get_message('title'), 'New title')
1081
        self.assertEqual(provider.get_message('login_title'),
1082
                         'New title LOGIN')
1083
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1084
        self.assertEqual(provider.get_module_icon,
1085
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1086
        self.assertEqual(provider.get_module_medium_icon,
1087
                         settings.MEDIA_URL +
1088
                         'im/auth/icons-medium/shibboleth.png')
1089

    
1090
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1091
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1092
        self.assertEqual(provider.get_method_details_msg,
1093
                         'Account: 12345')
1094
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1095
        self.assertEqual(provider.get_method_details_msg,
1096
                         'Account: 1234')
1097

    
1098
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1099
        self.assertEqual(provider.get_not_active_msg,
1100
                         "'Academic login' is disabled.")
1101

    
1102
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1103
    @shibboleth_settings(LIMIT_POLICY=2)
1104
    def test_templates(self):
1105
        user = get_local_user('kpap@synnefo.org')
1106
        user.add_auth_provider('shibboleth', identifier='1234')
1107
        user.add_auth_provider('shibboleth', identifier='12345')
1108

    
1109
        provider = auth_providers.get_provider('shibboleth')
1110
        self.assertEqual(provider.get_template('login'),
1111
                         'im/auth/shibboleth_login.html')
1112
        provider = auth_providers.get_provider('google')
1113
        self.assertEqual(provider.get_template('login'),
1114
                         'im/auth/generic_login.html')
1115

    
1116

    
1117
class TestActivationBackend(TestCase):
1118

    
1119
    def setUp(self):
1120
        # dummy call to pass through logging middleware
1121
        self.client.get(ui_url(''))
1122

    
1123
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1124
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1125
    def test_policies(self):
1126
        backend = activation_backends.get_backend()
1127

    
1128
        # email matches RE_USER_EMAIL_PATTERNS
1129
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1130
                               is_active=False, email_verified=False)
1131
        backend.handle_verification(user1, user1.verification_code)
1132
        self.assertEqual(user1.accepted_policy, 'email')
1133

    
1134
        # manually moderated
1135
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1136
                               is_active=False, email_verified=False)
1137

    
1138
        backend.handle_verification(user2, user2.verification_code)
1139
        self.assertEqual(user2.moderated, False)
1140
        backend.handle_moderation(user2)
1141
        self.assertEqual(user2.moderated, True)
1142
        self.assertEqual(user2.accepted_policy, 'manual')
1143

    
1144
        # autoaccept due to provider automoderate policy
1145
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1146
                               is_active=False, email_verified=False)
1147
        user3.auth_providers.all().delete()
1148
        user3.add_auth_provider('shibboleth', identifier='shib123')
1149
        backend.handle_verification(user3, user3.verification_code)
1150
        self.assertEqual(user3.moderated, True)
1151
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1152

    
1153
    @im_settings(MODERATION_ENABLED=False,
1154
                 MANAGERS=(('Manager',
1155
                            'manager@synnefo.org'),),
1156
                 HELPDESK=(('Helpdesk',
1157
                            'helpdesk@synnefo.org'),),
1158
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1159
    def test_without_moderation(self):
1160
        backend = activation_backends.get_backend()
1161
        form = backend.get_signup_form('local')
1162
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1163

    
1164
        user_data = {
1165
            'email': 'kpap@synnefo.org',
1166
            'first_name': 'Kostas Papas',
1167
            'password1': '123',
1168
            'password2': '123'
1169
        }
1170
        form = backend.get_signup_form('local', user_data)
1171
        user = form.save(commit=False)
1172
        form.store_user(user)
1173
        self.assertEqual(user.is_active, False)
1174
        self.assertEqual(user.email_verified, False)
1175

    
1176
        # step one, registration
1177
        result = backend.handle_registration(user)
1178
        user = AstakosUser.objects.get()
1179
        self.assertEqual(user.is_active, False)
1180
        self.assertEqual(user.email_verified, False)
1181
        self.assertTrue(user.verification_code)
1182
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1183
        backend.send_result_notifications(result, user)
1184
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1185
        self.assertEqual(len(mail.outbox), 1)
1186

    
1187
        # step two, verify email (automatically
1188
        # moderates/accepts user, since moderation is disabled)
1189
        user = AstakosUser.objects.get()
1190
        valid_code = user.verification_code
1191

    
1192
        # test invalid code
1193
        result = backend.handle_verification(user, valid_code)
1194
        backend.send_result_notifications(result, user)
1195
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1196
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1197
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1198
        # verification + activated + greeting = 3
1199
        self.assertEqual(len(mail.outbox), 3)
1200
        user = AstakosUser.objects.get()
1201
        self.assertEqual(user.is_active, True)
1202
        self.assertEqual(user.moderated, True)
1203
        self.assertTrue(user.moderated_at)
1204
        self.assertEqual(user.email_verified, True)
1205
        self.assertTrue(user.activation_sent)
1206

    
1207
    @im_settings(MODERATION_ENABLED=True,
1208
                 MANAGERS=(('Manager',
1209
                            'manager@synnefo.org'),),
1210
                 HELPDESK=(('Helpdesk',
1211
                            'helpdesk@synnefo.org'),),
1212
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1213
    def test_with_moderation(self):
1214

    
1215
        backend = activation_backends.get_backend()
1216
        form = backend.get_signup_form('local')
1217
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1218

    
1219
        user_data = {
1220
            'email': 'kpap@synnefo.org',
1221
            'first_name': 'Kostas Papas',
1222
            'password1': '123',
1223
            'password2': '123'
1224
        }
1225
        form = backend.get_signup_form(provider='local',
1226
                                       initial_data=user_data)
1227
        user = form.save(commit=False)
1228
        form.store_user(user)
1229
        self.assertEqual(user.is_active, False)
1230
        self.assertEqual(user.email_verified, False)
1231

    
1232
        # step one, registration
1233
        result = backend.handle_registration(user)
1234
        user = AstakosUser.objects.get()
1235
        self.assertEqual(user.is_active, False)
1236
        self.assertEqual(user.email_verified, False)
1237
        self.assertTrue(user.verification_code)
1238
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1239
        backend.send_result_notifications(result, user)
1240
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1241
        self.assertEqual(len(mail.outbox), 1)
1242

    
1243
        # step two, verifying email
1244
        user = AstakosUser.objects.get()
1245
        valid_code = user.verification_code
1246
        invalid_code = user.verification_code + 'invalid'
1247

    
1248
        # test invalid code
1249
        result = backend.handle_verification(user, invalid_code)
1250
        self.assertEqual(result.status, backend.Result.ERROR)
1251
        backend.send_result_notifications(result, user)
1252
        user = AstakosUser.objects.get()
1253
        self.assertEqual(user.is_active, False)
1254
        self.assertEqual(user.moderated, False)
1255
        self.assertEqual(user.moderated_at, None)
1256
        self.assertEqual(user.email_verified, False)
1257
        self.assertTrue(user.activation_sent)
1258

    
1259
        # test valid code
1260
        user = AstakosUser.objects.get()
1261
        result = backend.handle_verification(user, valid_code)
1262
        backend.send_result_notifications(result, user)
1263
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1264
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1265
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1266
        self.assertEqual(len(mail.outbox), 2)
1267
        user = AstakosUser.objects.get()
1268
        self.assertEqual(user.moderated, False)
1269
        self.assertEqual(user.moderated_at, None)
1270
        self.assertEqual(user.email_verified, True)
1271
        self.assertTrue(user.activation_sent)
1272

    
1273
        # test code reuse
1274
        result = backend.handle_verification(user, valid_code)
1275
        self.assertEqual(result.status, backend.Result.ERROR)
1276
        user = AstakosUser.objects.get()
1277
        self.assertEqual(user.is_active, False)
1278
        self.assertEqual(user.moderated, False)
1279
        self.assertEqual(user.moderated_at, None)
1280
        self.assertEqual(user.email_verified, True)
1281
        self.assertTrue(user.activation_sent)
1282

    
1283
        # valid code on verified user
1284
        user = AstakosUser.objects.get()
1285
        valid_code = user.verification_code
1286
        result = backend.handle_verification(user, valid_code)
1287
        self.assertEqual(result.status, backend.Result.ERROR)
1288

    
1289
        # step three, moderation user
1290
        user = AstakosUser.objects.get()
1291
        result = backend.handle_moderation(user)
1292
        backend.send_result_notifications(result, user)
1293

    
1294
        user = AstakosUser.objects.get()
1295
        self.assertEqual(user.is_active, True)
1296
        self.assertEqual(user.moderated, True)
1297
        self.assertTrue(user.moderated_at)
1298
        self.assertEqual(user.email_verified, True)
1299
        self.assertTrue(user.activation_sent)