Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests / auth.py @ 6ce8ecce

History | View | Annotate | Download (59.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import urlparse
35
import urllib
36

    
37
from astakos.im.tests.common import *
38

    
39
ui_url = lambda url: '/' + astakos_settings.BASE_PATH + '/ui/%s' % url
40

    
41

    
42
class ShibbolethTests(TestCase):
43
    """
44
    Testing shibboleth authentication.
45
    """
46

    
47
    def setUp(self):
48
        self.client = ShibbolethClient()
49
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
50
        astakos_settings.MODERATION_ENABLED = True
51

    
52
    def tearDown(self):
53
        AstakosUser.objects.all().delete()
54

    
55
    @im_settings(FORCE_PROFILE_UPDATE=False)
56
    def test_create_account(self):
57

    
58
        client = ShibbolethClient()
59

    
60
        # shibboleth views validation
61
        # eepn required
62
        r = client.get(ui_url('login/shibboleth?'), follow=True)
63
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
64
            'domain': astakos_settings.BASE_HOST,
65
            'contact_email': settings.CONTACT_EMAIL
66
        })
67
        client.set_tokens(eppn="kpapeppn")
68

    
69
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
70
        # shibboleth user info required
71
        r = client.get(ui_url('login/shibboleth?'), follow=True)
72
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
73
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
74

    
75
        # shibboleth logged us in
76
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
77
                          cn="Kostas Papadimitriou",
78
                          ep_affiliation="Test Affiliation")
79
        r = client.get(ui_url('login/shibboleth?'), follow=True)
80
        token = PendingThirdPartyUser.objects.get().token
81
        self.assertRedirects(r, ui_url('signup?third_party_token=%s' % token))
82
        self.assertEqual(r.status_code, 200)
83

    
84
        # a new pending user created
85
        pending_user = PendingThirdPartyUser.objects.get(
86
            third_party_identifier="kpapeppn")
87
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
88
        # keep the token for future use
89
        token = pending_user.token
90
        # from now on no shibboleth headers are sent to the server
91
        client.reset_tokens()
92

    
93
        # this is the old way, it should fail, to avoid pending user take over
94
        r = client.get(ui_url('shibboleth/signup/%s' % pending_user.username))
95
        self.assertEqual(r.status_code, 404)
96

    
97
        # this is the signup unique url associated with the pending user
98
        # created
99
        r = client.get(ui_url('signup/?third_party_token=%s' % token))
100
        identifier = pending_user.third_party_identifier
101
        post_data = {'third_party_identifier': identifier,
102
                     'first_name': 'Kostas',
103
                     'third_party_token': token,
104
                     'last_name': 'Mitroglou',
105
                     'provider': 'shibboleth'}
106

    
107
        signup_url = reverse('signup')
108

    
109
        # invlid email
110
        post_data['email'] = 'kpap'
111
        r = client.post(signup_url, post_data)
112
        self.assertContains(r, token)
113

    
114
        # existing email
115
        existing_user = get_local_user('test@test.com')
116
        post_data['email'] = 'test@test.com'
117
        r = client.post(signup_url, post_data)
118
        self.assertContains(r, messages.EMAIL_USED)
119
        existing_user.delete()
120

    
121
        # and finally a valid signup
122
        post_data['email'] = 'kpap@synnefo.org'
123
        r = client.post(signup_url, post_data, follow=True)
124
        self.assertContains(r, messages.VERIFICATION_SENT)
125

    
126
        # entires commited as expected
127
        self.assertEqual(AstakosUser.objects.count(), 1)
128
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
129
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
130

    
131
        # provider info stored
132
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
133
        self.assertEqual(provider.affiliation, 'Test Affiliation')
134
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
135
                                         u'eppn': u'kpapeppn',
136
                                         u'name': u'Kostas Papadimitriou'})
137

    
138
        # login (not activated yet)
139
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
140
                          cn="Kostas Papadimitriou", )
141
        r = client.get(ui_url("login/shibboleth?"), follow=True)
142
        self.assertContains(r, 'is pending moderation')
143

    
144
        # admin activates the user
145
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
146
        backend = activation_backends.get_backend()
147
        activation_result = backend.verify_user(u, u.verification_code)
148
        activation_result = backend.accept_user(u)
149
        self.assertFalse(activation_result.is_error())
150
        backend.send_result_notifications(activation_result, u)
151
        self.assertEqual(u.is_active, True)
152

    
153
        # we see our profile
154
        r = client.get(ui_url("login/shibboleth?"), follow=True)
155
        self.assertRedirects(r, ui_url('landing'))
156
        self.assertEqual(r.status_code, 200)
157

    
158
    def test_existing(self):
159
        """
160
        Test adding of third party login to an existing account
161
        """
162

    
163
        # this is our existing user
164
        existing_user = get_local_user('kpap@synnefo.org')
165
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
166
        existing_inactive.is_active = False
167
        existing_inactive.save()
168

    
169
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
170
        existing_unverified.is_active = False
171
        existing_unverified.activation_sent = None
172
        existing_unverified.email_verified = False
173
        existing_unverified.is_verified = False
174
        existing_unverified.save()
175

    
176
        client = ShibbolethClient()
177
        # shibboleth logged us in, notice that we use different email
178
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
179
                          cn="Kostas Papadimitriou", )
180
        r = client.get(ui_url("login/shibboleth?"), follow=True)
181

    
182
        # a new pending user created
183
        pending_user = PendingThirdPartyUser.objects.get()
184
        token = pending_user.token
185
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
186
        pending_key = pending_user.token
187
        client.reset_tokens()
188
        self.assertRedirects(r, ui_url("signup?third_party_token=%s" % token))
189

    
190
        form = r.context['login_form']
191
        signupdata = copy.copy(form.initial)
192
        signupdata['email'] = 'kpap@synnefo.org'
193
        signupdata['third_party_token'] = token
194
        signupdata['provider'] = 'shibboleth'
195
        signupdata.pop('id', None)
196

    
197
        # the email exists to another user
198
        r = client.post(ui_url("signup"), signupdata)
199
        self.assertContains(r, "There is already an account with this email "
200
                               "address")
201
        # change the case, still cannot create
202
        signupdata['email'] = 'KPAP@synnefo.org'
203
        r = client.post(ui_url("signup"), signupdata)
204
        self.assertContains(r, "There is already an account with this email "
205
                               "address")
206
        # inactive user
207
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
208
        r = client.post(ui_url("signup"), signupdata)
209
        self.assertContains(r, "There is already an account with this email "
210
                               "address")
211

    
212
        # unverified user, this should pass, old entry will be deleted
213
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
214
        r = client.post(ui_url("signup"), signupdata)
215

    
216
        post_data = {'password': 'password',
217
                     'username': 'kpap@synnefo.org'}
218
        r = client.post(ui_url('local'), post_data, follow=True)
219
        self.assertTrue(r.context['request'].user.is_authenticated())
220
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
221
                          cn="Kostas Papadimitriou", )
222
        r = client.get(ui_url("login/shibboleth?"), follow=True)
223
        self.assertContains(r, "enabled for this account")
224
        client.reset_tokens()
225

    
226
        user = existing_user
227
        self.assertTrue(user.has_auth_provider('shibboleth'))
228
        self.assertTrue(user.has_auth_provider('local',
229
                                               auth_backend='astakos'))
230
        client.logout()
231

    
232
        # look Ma, i can login with both my shibboleth and local account
233
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
234
                          cn="Kostas Papadimitriou")
235
        r = client.get(ui_url("login/shibboleth?"), follow=True)
236
        self.assertTrue(r.context['request'].user.is_authenticated())
237
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
238
        self.assertRedirects(r, ui_url('landing'))
239
        self.assertEqual(r.status_code, 200)
240
        client.logout()
241
        client.reset_tokens()
242

    
243
        # logged out
244
        r = client.get(ui_url("profile"), follow=True)
245
        self.assertFalse(r.context['request'].user.is_authenticated())
246

    
247
        # login with local account also works
248
        post_data = {'password': 'password',
249
                     'username': 'kpap@synnefo.org'}
250
        r = self.client.post(ui_url('local'), post_data, follow=True)
251
        self.assertTrue(r.context['request'].user.is_authenticated())
252
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
253
        self.assertRedirects(r, ui_url('landing'))
254
        self.assertEqual(r.status_code, 200)
255

    
256
        # cannot add the same eppn
257
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
258
                          cn="Kostas Papadimitriou", )
259
        r = client.get(ui_url("login/shibboleth?"), follow=True)
260
        self.assertRedirects(r, ui_url('landing'))
261
        self.assertTrue(r.status_code, 200)
262
        self.assertEquals(existing_user.auth_providers.count(), 2)
263

    
264
        # only one allowed by default
265
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
266
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
267
        prov = auth_providers.get_provider('shibboleth')
268
        r = client.get(ui_url("login/shibboleth?"), follow=True)
269
        self.assertContains(r, "Failed to add")
270
        self.assertRedirects(r, ui_url('profile'))
271
        self.assertTrue(r.status_code, 200)
272
        self.assertEquals(existing_user.auth_providers.count(), 2)
273
        client.logout()
274
        client.reset_tokens()
275

    
276
        # cannot login with another eppn
277
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
278
                          cn="Kostas Papadimitriou")
279
        r = client.get(ui_url("login/shibboleth?"), follow=True)
280
        self.assertFalse(r.context['request'].user.is_authenticated())
281

    
282
        # cannot
283

    
284
        # lets remove local password
285
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
286
                                       email="kpap@synnefo.org")
287
        remove_local_url = user.get_auth_provider('local').get_remove_url
288
        remove_shibbo_url = user.get_auth_provider('shibboleth',
289
                                                   'kpapeppn').get_remove_url
290
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
291
                          cn="Kostas Papadimtriou")
292
        r = client.get(ui_url("login/shibboleth?"), follow=True)
293
        client.reset_tokens()
294

    
295
        # only POST is allowed (for CSRF protection)
296
        r = client.get(remove_local_url, follow=True)
297
        self.assertEqual(r.status_code, 405)
298

    
299
        r = client.post(remove_local_url, follow=True)
300
        # 2 providers left
301
        self.assertEqual(user.auth_providers.count(), 1)
302
        # cannot remove last provider
303
        r = client.post(remove_shibbo_url)
304
        self.assertEqual(r.status_code, 403)
305
        self.client.logout()
306

    
307
        # cannot login using local credentials (notice we use another client)
308
        post_data = {'password': 'password',
309
                     'username': 'kpap@synnefo.org'}
310
        r = self.client.post(ui_url('local'), post_data, follow=True)
311
        self.assertFalse(r.context['request'].user.is_authenticated())
312

    
313
        # we can reenable the local provider by setting a password
314
        r = client.get(ui_url("password_change"), follow=True)
315
        r = client.post(ui_url("password_change"), {'new_password1': '111',
316
                                                'new_password2': '111'},
317
                        follow=True)
318
        user = r.context['request'].user
319
        self.assertTrue(user.has_auth_provider('local'))
320
        self.assertTrue(user.has_auth_provider('shibboleth'))
321
        self.assertTrue(user.check_password('111'))
322
        self.assertTrue(user.has_usable_password())
323
        self.client.logout()
324

    
325
        # now we can login
326
        post_data = {'password': '111',
327
                     'username': 'kpap@synnefo.org'}
328
        r = self.client.post(ui_url('local'), post_data, follow=True)
329
        self.assertTrue(r.context['request'].user.is_authenticated())
330

    
331
        client.reset_tokens()
332

    
333
        # we cannot take over another shibboleth identifier
334
        user2 = get_local_user('another@synnefo.org')
335
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
336
        # login
337
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
338
                          cn="Kostas Papadimitriou")
339
        r = client.get(ui_url("login/shibboleth?"), follow=True)
340
        # try to assign existing shibboleth identifier of another user
341
        client.set_tokens(mail="kpap_second@shibboleth.gr",
342
                          eppn="existingeppn", cn="Kostas Papadimitriou")
343
        r = client.get(ui_url("login/shibboleth?"), follow=True)
344
        self.assertContains(r, "is already in use")
345

    
346

    
347
class TestLocal(TestCase):
348

    
349
    def setUp(self):
350
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
351
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
352
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
353
        settings.ASTAKOS_MODERATION_ENABLED = True
354

    
355
    def tearDown(self):
356
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
357
        AstakosUser.objects.all().delete()
358

    
359
    def test_no_moderation(self):
360
        # disable moderation
361
        astakos_settings.MODERATION_ENABLED = False
362

    
363
        # create a new user
364
        r = self.client.get(ui_url("signup"))
365
        self.assertEqual(r.status_code, 200)
366
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
367
                'password2': 'password', 'first_name': 'Kostas',
368
                'last_name': 'Mitroglou', 'provider': 'local'}
369
        r = self.client.post(ui_url("signup"), data)
370

    
371
        # user created
372
        self.assertEqual(AstakosUser.objects.count(), 1)
373
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
374
                                       email="kpap@synnefo.org")
375
        self.assertEqual(user.username, 'kpap@synnefo.org')
376
        self.assertEqual(user.has_auth_provider('local'), True)
377
        self.assertFalse(user.is_active)
378

    
379
        # user (but not admin) gets notified
380
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
381
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
382
        astakos_settings.MODERATION_ENABLED = True
383

    
384
    def test_email_case(self):
385
        data = {
386
            'email': 'kPap@synnefo.org',
387
            'password1': '1234',
388
            'password2': '1234'
389
        }
390

    
391
        form = forms.LocalUserCreationForm(data)
392
        self.assertTrue(form.is_valid())
393
        user = form.save()
394
        form.store_user(user, {})
395

    
396
        u = AstakosUser.objects.get()
397
        self.assertEqual(u.email, 'kPap@synnefo.org')
398
        self.assertEqual(u.username, 'kpap@synnefo.org')
399
        u.is_active = True
400
        u.email_verified = True
401
        u.save()
402

    
403
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
404
        login = forms.LoginForm(data=data)
405
        self.assertTrue(login.is_valid())
406

    
407
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
408
        login = forms.LoginForm(data=data)
409
        self.assertTrue(login.is_valid())
410

    
411
        data = {
412
            'email': 'kpap@synnefo.org',
413
            'password1': '1234',
414
            'password2': '1234'
415
        }
416
        form = forms.LocalUserCreationForm(data)
417
        self.assertFalse(form.is_valid())
418

    
419
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
420
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
421
    def test_local_provider(self):
422
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
423

    
424
        # create a user
425
        r = self.client.get(ui_url("signup"))
426
        self.assertEqual(r.status_code, 200)
427
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
428
                'password2': 'password', 'first_name': 'Kostas',
429
                'last_name': 'Mitroglou', 'provider': 'local'}
430
        r = self.client.post(ui_url("signup"), data)
431

    
432
        # user created
433
        self.assertEqual(AstakosUser.objects.count(), 1)
434
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
435
                                       email="kpap@synnefo.org")
436
        self.assertEqual(user.username, 'kpap@synnefo.org')
437
        self.assertEqual(user.has_auth_provider('local'), True)
438
        self.assertFalse(user.is_active)  # not activated
439
        self.assertFalse(user.email_verified)  # not verified
440
        self.assertTrue(user.activation_sent)  # activation automatically sent
441
        self.assertFalse(user.moderated)
442
        self.assertFalse(user.email_verified)
443

    
444
        # admin gets notified and activates the user from the command line
445
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
446
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
447
                                           'password': 'password'},
448
                             follow=True)
449
        self.assertContains(r, messages.VERIFICATION_SENT)
450
        backend = activation_backends.get_backend()
451

    
452
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
453
        backend.send_user_verification_email(user)
454

    
455
        # user activation fields updated and user gets notified via email
456
        user = AstakosUser.objects.get(pk=user.pk)
457
        self.assertTrue(user.activation_sent)
458
        self.assertFalse(user.email_verified)
459
        self.assertFalse(user.is_active)
460
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
461

    
462
        # user forgot she got registered and tries to submit registration
463
        # form. Notice the upper case in email
464
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
465
                'password2': 'password', 'first_name': 'Kostas',
466
                'last_name': 'Mitroglou', 'provider': 'local'}
467
        r = self.client.post(ui_url("signup"), data, follow=True)
468
        self.assertRedirects(r, reverse('login'))
469
        self.assertContains(r, messages.VERIFICATION_SENT)
470

    
471
        user = AstakosUser.objects.get()
472
        # previous user replaced
473
        self.assertTrue(user.activation_sent)
474
        self.assertFalse(user.email_verified)
475
        self.assertFalse(user.is_active)
476
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
477

    
478
        # hmmm, email exists; lets request a password change
479
        r = self.client.get(ui_url('local/password_reset'))
480
        self.assertEqual(r.status_code, 200)
481
        data = {'email': 'kpap@synnefo.org'}
482
        r = self.client.post(ui_url('local/password_reset'), data, follow=True)
483
        # she can't because account is not active yet
484
        self.assertContains(r, 'pending activation')
485

    
486
        # moderation is enabled and an activation email has already been sent
487
        # so user can trigger resend of the activation email
488
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
489
                            follow=True)
490
        self.assertContains(r, 'has been sent to your email address.')
491
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
492

    
493
        # also she cannot login
494
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
495
        r = self.client.post(ui_url('local'), data, follow=True)
496
        self.assertContains(r, 'Resend activation')
497
        self.assertFalse(r.context['request'].user.is_authenticated())
498
        self.assertFalse('_pithos2_a' in self.client.cookies)
499

    
500
        # user sees the message and resends activation
501
        r = self.client.get(ui_url('send/activation/%d' % user.pk),
502
                            follow=True)
503
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
504

    
505
        # logged in user cannot activate another account
506
        tmp_user = get_local_user("test_existing_user@synnefo.org")
507
        tmp_client = Client()
508
        tmp_client.login(username="test_existing_user@synnefo.org",
509
                         password="password")
510
        r = tmp_client.get(user.get_activation_url(), follow=True)
511
        self.assertContains(r, messages.LOGGED_IN_WARNING)
512

    
513
        r = self.client.get(user.get_activation_url(), follow=True)
514
        # previous code got invalidated
515
        self.assertEqual(r.status_code, 404)
516

    
517
        user = AstakosUser.objects.get(pk=user.pk)
518
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
519
        r = self.client.get(user.get_activation_url(), follow=True)
520
        self.assertRedirects(r, reverse('login'))
521
        # user sees that account is pending approval from admins
522
        self.assertContains(r, messages.NOTIFICATION_SENT)
523
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
524

    
525
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
526
        result = backend.handle_moderation(user)
527
        backend.send_result_notifications(result, user)
528
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
529
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
530

    
531
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
532
        r = self.client.get(ui_url('profile'), follow=True)
533
        self.assertFalse(r.context['request'].user.is_authenticated())
534
        self.assertFalse('_pithos2_a' in self.client.cookies)
535
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
536

    
537
        user = AstakosUser.objects.get(pk=user.pk)
538
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
539
                                               'password': 'password'},
540
                             follow=True)
541
        # user activated and logged in, token cookie set
542
        self.assertTrue(r.context['request'].user.is_authenticated())
543
        self.assertTrue('_pithos2_a' in self.client.cookies)
544
        cookies = self.client.cookies
545
        self.assertTrue(quote(user.auth_token) in
546
                        cookies.get('_pithos2_a').value)
547
        r = self.client.get(ui_url('logout'), follow=True)
548
        r = self.client.get(ui_url(''), follow=True)
549
        self.assertRedirects(r, ui_url('login'))
550
        # user logged out, token cookie removed
551
        self.assertFalse(r.context['request'].user.is_authenticated())
552
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
553

    
554
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
555
        del self.client.cookies['_pithos2_a']
556

    
557
        # user can login
558
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
559
                                               'password': 'password'},
560
                             follow=True)
561
        self.assertTrue(r.context['request'].user.is_authenticated())
562
        self.assertTrue('_pithos2_a' in self.client.cookies)
563
        cookies = self.client.cookies
564
        self.assertTrue(quote(user.auth_token) in
565
                        cookies.get('_pithos2_a').value)
566
        self.client.get(ui_url('logout'), follow=True)
567

    
568
        # user forgot password
569
        old_pass = user.password
570
        r = self.client.get(ui_url('local/password_reset'))
571
        self.assertEqual(r.status_code, 200)
572
        r = self.client.post(ui_url('local/password_reset'),
573
                             {'email': 'kpap@synnefo.org'})
574
        self.assertEqual(r.status_code, 302)
575
        # email sent
576
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
577

    
578
        # user visits change password link
579
        user = AstakosUser.objects.get(pk=user.pk)
580
        r = self.client.get(user.get_password_reset_url())
581
        r = self.client.post(user.get_password_reset_url(),
582
                             {'new_password1': 'newpass',
583
                              'new_password2': 'newpass'})
584

    
585
        user = AstakosUser.objects.get(pk=user.pk)
586
        self.assertNotEqual(old_pass, user.password)
587

    
588
        # old pass is not usable
589
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
590
                                               'password': 'password'})
591
        self.assertContains(r, 'Please enter a correct username and password')
592
        r = self.client.post(ui_url('local'), {'username': 'kpap@synnefo.org',
593
                                               'password': 'newpass'},
594
                             follow=True)
595
        self.assertTrue(r.context['request'].user.is_authenticated())
596
        self.client.logout()
597

    
598
        # tests of special local backends
599
        user = AstakosUser.objects.get(pk=user.pk)
600
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
601
        user.save()
602

    
603
        # non astakos local backends do not support password reset
604
        r = self.client.get(ui_url('local/password_reset'))
605
        self.assertEqual(r.status_code, 200)
606
        r = self.client.post(ui_url('local/password_reset'),
607
                             {'email': 'kpap@synnefo.org'})
608
        # she can't because account is not active yet
609
        self.assertContains(r, "Changing password is not")
610

    
611

    
612
class UserActionsTests(TestCase):
613

    
614
    def test_email_change(self):
615
        # to test existing email validation
616
        get_local_user('existing@synnefo.org')
617

    
618
        # local user
619
        user = get_local_user('kpap@synnefo.org')
620

    
621
        # login as kpap
622
        self.client.login(username='kpap@synnefo.org', password='password')
623
        r = self.client.get(ui_url('profile'), follow=True)
624
        user = r.context['request'].user
625
        self.assertTrue(user.is_authenticated())
626

    
627
        # change email is enabled
628
        r = self.client.get(ui_url('email_change'))
629
        self.assertEqual(r.status_code, 200)
630
        self.assertFalse(user.email_change_is_pending())
631

    
632
        # request email change to an existing email fails
633
        data = {'new_email_address': 'existing@synnefo.org'}
634
        r = self.client.post(ui_url('email_change'), data)
635
        self.assertContains(r, messages.EMAIL_USED)
636

    
637
        # proper email change
638
        data = {'new_email_address': 'kpap@gmail.com'}
639
        r = self.client.post(ui_url('email_change'), data, follow=True)
640
        self.assertRedirects(r, ui_url('profile'))
641
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
642
        change1 = EmailChange.objects.get()
643

    
644
        # user sees a warning
645
        r = self.client.get(ui_url('email_change'))
646
        self.assertEqual(r.status_code, 200)
647
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
648
        self.assertTrue(user.email_change_is_pending())
649

    
650
        # link was sent
651
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
652
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
653

    
654
        # proper email change
655
        data = {'new_email_address': 'kpap@yahoo.com'}
656
        r = self.client.post(ui_url('email_change'), data, follow=True)
657
        self.assertRedirects(r, ui_url('profile'))
658
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
659
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
660
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
661
        change2 = EmailChange.objects.get()
662

    
663
        r = self.client.get(change1.get_url())
664
        self.assertEquals(r.status_code, 404)
665
        self.client.logout()
666

    
667
        invalid_client = Client()
668
        r = invalid_client.post(ui_url('local?'),
669
                                {'username': 'existing@synnefo.org',
670
                                 'password': 'password'})
671
        r = invalid_client.get(change2.get_url(), follow=True)
672
        self.assertEquals(r.status_code, 403)
673

    
674
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
675
                             {'username': 'kpap@synnefo.org',
676
                              'password': 'password',
677
                              'next': change2.get_url()},
678
                             follow=True)
679
        self.assertRedirects(r, ui_url('profile'))
680
        user = r.context['request'].user
681
        self.assertEquals(user.email, 'kpap@yahoo.com')
682
        self.assertEquals(user.username, 'kpap@yahoo.com')
683

    
684
        self.client.logout()
685
        r = self.client.post(ui_url('local?next=' + change2.get_url()),
686
                             {'username': 'kpap@synnefo.org',
687
                              'password': 'password',
688
                              'next': change2.get_url()},
689
                             follow=True)
690
        self.assertContains(r, "Please enter a correct username and password")
691
        self.assertEqual(user.emailchanges.count(), 0)
692

    
693
        AstakosUser.objects.all().delete()
694
        Group.objects.all().delete()
695

    
696

    
697
class TestAuthProviderViews(TestCase):
698

    
699
    def tearDown(self):
700
        AstakosUser.objects.all().delete()
701

    
702
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
703
                         AUTOMODERATE_POLICY=True)
704
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
705
                 FORCE_PROFILE_UPDATE=False)
706
    def test_user(self):
707
        Profile = AuthProviderPolicyProfile
708
        Pending = PendingThirdPartyUser
709
        User = AstakosUser
710

    
711
        User.objects.create(email="newuser@synnefo.org")
712
        get_local_user("olduser@synnefo.org")
713
        cl_olduser = ShibbolethClient()
714
        get_local_user("olduser2@synnefo.org")
715
        ShibbolethClient()
716
        cl_newuser = ShibbolethClient()
717
        cl_newuser2 = Client()
718

    
719
        academic_group, created = Group.objects.get_or_create(
720
            name='academic-login')
721
        academic_users = academic_group.user_set
722
        assert created
723
        policy_only_academic = Profile.objects.add_policy('academic_strict',
724
                                                          'shibboleth',
725
                                                          academic_group,
726
                                                          exclusive=True,
727
                                                          login=False,
728
                                                          add=False)
729

    
730
        # new academic user
731
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
732
        cl_newuser.set_tokens(eppn="newusereppn")
733
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
734
        pending = Pending.objects.get()
735
        identifier = pending.third_party_identifier
736
        signup_data = {'third_party_identifier': identifier,
737
                       'first_name': 'Academic',
738
                       'third_party_token': pending.token,
739
                       'last_name': 'New User',
740
                       'provider': 'shibboleth'}
741
        r = cl_newuser.post(ui_url('signup'), signup_data)
742
        self.assertContains(r, "This field is required", )
743
        signup_data['email'] = 'olduser@synnefo.org'
744
        r = cl_newuser.post(ui_url('signup'), signup_data)
745
        self.assertContains(r, "already an account with this email", )
746
        signup_data['email'] = 'newuser@synnefo.org'
747
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
748
        r = cl_newuser.post(ui_url('signup'), signup_data, follow=True)
749
        self.assertEqual(r.status_code, 404)
750
        newuser = User.objects.get(email="newuser@synnefo.org")
751
        activation_link = newuser.get_activation_url()
752
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
753

    
754
        # new non-academic user
755
        signup_data = {'first_name': 'Non Academic',
756
                       'last_name': 'New User',
757
                       'provider': 'local',
758
                       'password1': 'password',
759
                       'password2': 'password'}
760
        signup_data['email'] = 'olduser@synnefo.org'
761
        r = cl_newuser2.post(ui_url('signup'), signup_data)
762
        self.assertContains(r, 'There is already an account with this '
763
                               'email address')
764
        signup_data['email'] = 'newuser@synnefo.org'
765
        r = cl_newuser2.post(ui_url('signup/'), signup_data)
766
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
767
        r = self.client.get(activation_link, follow=True)
768
        self.assertEqual(r.status_code, 404)
769
        newuser = User.objects.get(email="newuser@synnefo.org")
770
        self.assertTrue(newuser.activation_sent)
771

    
772
        # activation sent, user didn't open verification url so additional
773
        # registrations invalidate the previous signups.
774
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
775
        r = cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
776
        pending = Pending.objects.get()
777
        identifier = pending.third_party_identifier
778
        signup_data = {'third_party_identifier': identifier,
779
                       'first_name': 'Academic',
780
                       'third_party_token': pending.token,
781
                       'last_name': 'New User',
782
                       'provider': 'shibboleth'}
783
        signup_data['email'] = 'newuser@synnefo.org'
784
        r = cl_newuser.post(ui_url('signup'), signup_data)
785
        self.assertEqual(r.status_code, 302)
786
        newuser = User.objects.get(email="newuser@synnefo.org")
787
        self.assertTrue(newuser.activation_sent)
788
        activation_link = newuser.get_activation_url()
789
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
790
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
791
        self.assertRedirects(r, ui_url('landing'))
792
        newuser = User.objects.get(email="newuser@synnefo.org")
793
        self.assertEqual(newuser.is_active, True)
794
        self.assertEqual(newuser.email_verified, True)
795
        cl_newuser.logout()
796

    
797
        # cannot reactivate if suspended
798
        newuser.is_active = False
799
        newuser.save()
800
        r = cl_newuser.get(newuser.get_activation_url())
801
        newuser = User.objects.get(email="newuser@synnefo.org")
802
        self.assertFalse(newuser.is_active)
803

    
804
        # release suspension
805
        newuser.is_active = True
806
        newuser.save()
807

    
808
        cl_newuser.get(ui_url('login/shibboleth?'), follow=True)
809
        local = auth.get_provider('local', newuser)
810
        self.assertEqual(local.get_add_policy, False)
811
        self.assertEqual(local.get_login_policy, False)
812
        r = cl_newuser.get(local.get_add_url, follow=True)
813
        self.assertRedirects(r, ui_url('profile'))
814
        self.assertContains(r, 'disabled for your')
815

    
816
        cl_olduser.login(username='olduser@synnefo.org', password="password")
817
        r = cl_olduser.get(ui_url('profile'), follow=True)
818
        self.assertEqual(r.status_code, 200)
819
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
820
        self.assertContains(r, 'Your request is missing a unique token')
821
        cl_olduser.set_tokens(eppn="newusereppn")
822
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
823
        self.assertContains(r, 'already in use')
824
        cl_olduser.set_tokens(eppn="oldusereppn")
825
        r = cl_olduser.get(ui_url('login/shibboleth?'), follow=True)
826
        self.assertContains(r, 'Academic login enabled for this account')
827

    
828
        user = User.objects.get(email="olduser@synnefo.org")
829
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
830
        local_provider = user.get_auth_provider('local')
831
        self.assertEqual(shib_provider.get_remove_policy, True)
832
        self.assertEqual(local_provider.get_remove_policy, True)
833

    
834
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
835
                                                          'shibboleth',
836
                                                          academic_group,
837
                                                          remove=False)
838
        user.groups.add(academic_group)
839
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
840
        local_provider = user.get_auth_provider('local')
841
        self.assertEqual(shib_provider.get_remove_policy, False)
842
        self.assertEqual(local_provider.get_remove_policy, True)
843
        self.assertEqual(local_provider.get_login_policy, False)
844

    
845
        cl_olduser.logout()
846
        login_data = {'username': 'olduser@synnefo.org',
847
                      'password': 'password'}
848
        r = cl_olduser.post(ui_url('local'), login_data, follow=True)
849
        self.assertContains(r, "login/shibboleth'>Academic login")
850
        Group.objects.all().delete()
851

    
852

    
853
class TestAuthProvidersAPI(TestCase):
854
    """
855
    Test auth_providers module API
856
    """
857

    
858
    def tearDown(self):
859
        Group.objects.all().delete()
860

    
861
    @im_settings(IM_MODULES=['local', 'shibboleth'])
862
    def test_create(self):
863
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
864
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
865

    
866
        module = 'shibboleth'
867
        identifier = 'SHIB_UUID'
868
        provider_params = {
869
            'affiliation': 'UNIVERSITY',
870
            'info': {'age': 27}
871
        }
872
        provider = auth.get_provider(module, user2, identifier,
873
                                     **provider_params)
874
        provider.add_to_user()
875
        provider = auth.get_provider(module, user, identifier,
876
                                     **provider_params)
877
        provider.add_to_user()
878
        user.email_verified = True
879
        user.save()
880
        self.assertRaises(Exception, provider.add_to_user)
881
        provider = user.get_auth_provider(module, identifier)
882
        self.assertEqual(user.get_auth_provider(
883
            module, identifier)._instance.info.get('age'), 27)
884

    
885
        module = 'local'
886
        identifier = None
887
        provider_params = {'auth_backend': 'ldap', 'info':
888
                          {'office': 'A1'}}
889
        provider = auth.get_provider(module, user, identifier,
890
                                     **provider_params)
891
        provider.add_to_user()
892
        self.assertFalse(provider.get_add_policy)
893
        self.assertRaises(Exception, provider.add_to_user)
894

    
895
        shib = user.get_auth_provider('shibboleth',
896
                                      'SHIB_UUID')
897
        self.assertTrue(shib.get_remove_policy)
898

    
899
        local = user.get_auth_provider('local')
900
        self.assertTrue(local.get_remove_policy)
901

    
902
        local.remove_from_user()
903
        self.assertFalse(shib.get_remove_policy)
904
        self.assertRaises(Exception, shib.remove_from_user)
905

    
906
        provider = user.get_auth_providers()[0]
907
        self.assertRaises(Exception, provider.add_to_user)
908

    
909
    @im_settings(IM_MODULES=['local', 'shibboleth'])
910
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
911
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
912
                                                 'group2'])
913
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
914
                        CREATION_GROUPS_POLICY=['localgroup-create',
915
                                                'group-create'])
916
    def test_add_groups(self):
917
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
918
        provider = auth.get_provider('shibboleth', user, 'test123')
919
        provider.add_to_user()
920
        user = AstakosUser.objects.get()
921
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
922
                              sorted([u'group1', u'group2', u'group-create']))
923

    
924
        local = auth.get_provider('local', user)
925
        local.add_to_user()
926
        provider = user.get_auth_provider('shibboleth')
927
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
928
        provider.remove_from_user()
929
        user = AstakosUser.objects.get()
930
        self.assertEqual(len(user.get_auth_providers()), 1)
931
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
932
                              sorted([u'group-create', u'localgroup']))
933

    
934
        local = user.get_auth_provider('local')
935
        self.assertRaises(Exception, local.remove_from_user)
936
        provider = auth.get_provider('shibboleth', user, 'test123')
937
        provider.add_to_user()
938
        user = AstakosUser.objects.get()
939
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
940
                              sorted([u'group-create', u'group1', u'group2',
941
                               u'localgroup']))
942
        Group.objects.all().delete()
943

    
944

    
945

    
946
    @im_settings(IM_MODULES=['local', 'shibboleth'])
947
    def test_policies(self):
948
        group_old, created = Group.objects.get_or_create(name='olduser')
949

    
950
        astakos_settings.MODERATION_ENABLED = True
951
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
952
            ['academic-user']
953
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
954
            ['google-user']
955

    
956
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
957
        user.groups.add(group_old)
958
        user.add_auth_provider('local')
959

    
960
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
961
        user2.add_auth_provider('shibboleth', identifier='shibid')
962

    
963
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
964
        user3.groups.add(group_old)
965
        user3.add_auth_provider('local')
966
        user3.add_auth_provider('shibboleth', identifier='1234')
967

    
968
        self.assertTrue(user2.groups.get(name='academic-user'))
969
        self.assertFalse(user2.groups.filter(name='olduser').count())
970

    
971
        local = auth_providers.get_provider('local')
972
        self.assertTrue(local.get_add_policy)
973

    
974
        academic_group = Group.objects.get(name='academic-user')
975
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
976
                                                     academic_group,
977
                                                     exclusive=True,
978
                                                     add=False,
979
                                                     login=False)
980
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
981
                                                     academic_group,
982
                                                     exclusive=True,
983
                                                     login=False,
984
                                                     add=False)
985
        # no duplicate entry gets created
986
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
987

    
988
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
989
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
990
                                                     user2,
991
                                                     remove=False)
992
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
993

    
994
        local = auth_providers.get_provider('local', user2)
995
        google = auth_providers.get_provider('google', user2)
996
        shibboleth = auth_providers.get_provider('shibboleth', user2)
997
        self.assertTrue(shibboleth.get_login_policy)
998
        self.assertFalse(shibboleth.get_remove_policy)
999
        self.assertFalse(local.get_add_policy)
1000
        self.assertFalse(local.get_add_policy)
1001
        self.assertFalse(google.get_add_policy)
1002

    
1003
        user2.groups.remove(Group.objects.get(name='academic-user'))
1004
        self.assertTrue(local.get_add_policy)
1005
        self.assertTrue(google.get_add_policy)
1006
        user2.groups.add(Group.objects.get(name='academic-user'))
1007

    
1008
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1009
                                                     user2,
1010
                                                     exclusive=True,
1011
                                                     add=True)
1012
        self.assertTrue(local.get_add_policy)
1013
        self.assertTrue(google.get_add_policy)
1014

    
1015
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1016
        self.assertFalse(local.get_automoderate_policy)
1017
        self.assertFalse(google.get_automoderate_policy)
1018
        self.assertTrue(shibboleth.get_automoderate_policy)
1019

    
1020
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1021
                  'GOOGLE_ADD_GROUPS_POLICY']:
1022
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1023

    
1024

    
1025
    @shibboleth_settings(CREATE_POLICY=True)
1026
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1027
    def test_create_http(self):
1028
        # this should be wrapped inside a transaction
1029
        user = AstakosUser(email="test@test.com")
1030
        user.save()
1031
        provider = auth_providers.get_provider('shibboleth', user,
1032
                                               'test@academia.test')
1033
        provider.add_to_user()
1034
        user.get_auth_provider('shibboleth', 'test@academia.test')
1035
        provider = auth_providers.get_provider('local', user)
1036
        provider.add_to_user()
1037
        user.get_auth_provider('local')
1038

    
1039
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1040
        user = AstakosUser(email="test2@test.com")
1041
        user.save()
1042
        provider = auth_providers.get_provider('shibboleth', user,
1043
                                               'test@shibboleth.com',
1044
                                               **{'info': {'name':
1045
                                                                'User Test'}})
1046
        self.assertFalse(provider.get_create_policy)
1047
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1048
        self.assertTrue(provider.get_create_policy)
1049
        academic = provider.add_to_user()
1050

    
1051
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1052
    @shibboleth_settings(LIMIT_POLICY=2)
1053
    def test_policies(self):
1054
        user = get_local_user('kpap@synnefo.org')
1055
        user.add_auth_provider('shibboleth', identifier='1234')
1056
        user.add_auth_provider('shibboleth', identifier='12345')
1057

    
1058
        # default limit is 1
1059
        local = user.get_auth_provider('local')
1060
        self.assertEqual(local.get_add_policy, False)
1061

    
1062
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1063
        academic = user.get_auth_provider('shibboleth',
1064
                                          identifier='1234')
1065
        self.assertEqual(academic.get_add_policy, False)
1066
        newacademic = auth_providers.get_provider('shibboleth', user,
1067
                                                  identifier='123456')
1068
        self.assertEqual(newacademic.get_add_policy, True)
1069
        user.add_auth_provider('shibboleth', identifier='123456')
1070
        self.assertEqual(academic.get_add_policy, False)
1071
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1072

    
1073
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1074
    @shibboleth_settings(LIMIT_POLICY=2)
1075
    def test_messages(self):
1076
        user = get_local_user('kpap@synnefo.org')
1077
        user.add_auth_provider('shibboleth', identifier='1234')
1078
        user.add_auth_provider('shibboleth', identifier='12345')
1079
        provider = auth_providers.get_provider('shibboleth')
1080
        self.assertEqual(provider.get_message('title'), 'Academic')
1081
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1082
        # regenerate messages cache
1083
        provider = auth_providers.get_provider('shibboleth')
1084
        self.assertEqual(provider.get_message('title'), 'New title')
1085
        self.assertEqual(provider.get_message('login_title'),
1086
                         'New title LOGIN')
1087
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1088
        self.assertEqual(provider.get_module_icon,
1089
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1090
        self.assertEqual(provider.get_module_medium_icon,
1091
                         settings.MEDIA_URL +
1092
                         'im/auth/icons-medium/shibboleth.png')
1093

    
1094
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1095
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1096
        self.assertEqual(provider.get_method_details_msg,
1097
                         'Account: 12345')
1098
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1099
        self.assertEqual(provider.get_method_details_msg,
1100
                         'Account: 1234')
1101

    
1102
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1103
        self.assertEqual(provider.get_not_active_msg,
1104
                         "'Academic login' is disabled.")
1105

    
1106
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1107
    @shibboleth_settings(LIMIT_POLICY=2)
1108
    def test_templates(self):
1109
        user = get_local_user('kpap@synnefo.org')
1110
        user.add_auth_provider('shibboleth', identifier='1234')
1111
        user.add_auth_provider('shibboleth', identifier='12345')
1112

    
1113
        provider = auth_providers.get_provider('shibboleth')
1114
        self.assertEqual(provider.get_template('login'),
1115
                         'im/auth/shibboleth_login.html')
1116
        provider = auth_providers.get_provider('google')
1117
        self.assertEqual(provider.get_template('login'),
1118
                         'im/auth/generic_login.html')
1119

    
1120

    
1121
class TestActivationBackend(TestCase):
1122

    
1123
    def setUp(self):
1124
        # dummy call to pass through logging middleware
1125
        self.client.get(ui_url(''))
1126

    
1127
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1128
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1129
    def test_policies(self):
1130
        backend = activation_backends.get_backend()
1131

    
1132
        # email matches RE_USER_EMAIL_PATTERNS
1133
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1134
                               is_active=False, email_verified=False)
1135
        backend.handle_verification(user1, user1.verification_code)
1136
        self.assertEqual(user1.accepted_policy, 'email')
1137

    
1138
        # manually moderated
1139
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1140
                               is_active=False, email_verified=False)
1141

    
1142
        backend.handle_verification(user2, user2.verification_code)
1143
        self.assertEqual(user2.moderated, False)
1144
        backend.handle_moderation(user2)
1145
        self.assertEqual(user2.moderated, True)
1146
        self.assertEqual(user2.accepted_policy, 'manual')
1147

    
1148
        # autoaccept due to provider automoderate policy
1149
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1150
                               is_active=False, email_verified=False)
1151
        user3.auth_providers.all().delete()
1152
        user3.add_auth_provider('shibboleth', identifier='shib123')
1153
        backend.handle_verification(user3, user3.verification_code)
1154
        self.assertEqual(user3.moderated, True)
1155
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1156

    
1157
    @im_settings(MODERATION_ENABLED=False,
1158
                 MANAGERS=(('Manager',
1159
                            'manager@synnefo.org'),),
1160
                 HELPDESK=(('Helpdesk',
1161
                            'helpdesk@synnefo.org'),),
1162
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1163
    def test_without_moderation(self):
1164
        backend = activation_backends.get_backend()
1165
        form = backend.get_signup_form('local')
1166
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1167

    
1168
        user_data = {
1169
            'email': 'kpap@synnefo.org',
1170
            'first_name': 'Kostas Papas',
1171
            'password1': '123',
1172
            'password2': '123'
1173
        }
1174
        form = backend.get_signup_form('local', user_data)
1175
        user = form.save(commit=False)
1176
        form.store_user(user)
1177
        self.assertEqual(user.is_active, False)
1178
        self.assertEqual(user.email_verified, False)
1179

    
1180
        # step one, registration
1181
        result = backend.handle_registration(user)
1182
        user = AstakosUser.objects.get()
1183
        self.assertEqual(user.is_active, False)
1184
        self.assertEqual(user.email_verified, False)
1185
        self.assertTrue(user.verification_code)
1186
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1187
        backend.send_result_notifications(result, user)
1188
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1189
        self.assertEqual(len(mail.outbox), 1)
1190

    
1191
        # step two, verify email (automatically
1192
        # moderates/accepts user, since moderation is disabled)
1193
        user = AstakosUser.objects.get()
1194
        valid_code = user.verification_code
1195

    
1196
        # test invalid code
1197
        result = backend.handle_verification(user, valid_code)
1198
        backend.send_result_notifications(result, user)
1199
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1200
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1201
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1202
        # verification + activated + greeting = 3
1203
        self.assertEqual(len(mail.outbox), 3)
1204
        user = AstakosUser.objects.get()
1205
        self.assertEqual(user.is_active, True)
1206
        self.assertEqual(user.moderated, True)
1207
        self.assertTrue(user.moderated_at)
1208
        self.assertEqual(user.email_verified, True)
1209
        self.assertTrue(user.activation_sent)
1210

    
1211
    @im_settings(MODERATION_ENABLED=True,
1212
                 MANAGERS=(('Manager',
1213
                            'manager@synnefo.org'),),
1214
                 HELPDESK=(('Helpdesk',
1215
                            'helpdesk@synnefo.org'),),
1216
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1217
    def test_with_moderation(self):
1218

    
1219
        backend = activation_backends.get_backend()
1220
        form = backend.get_signup_form('local')
1221
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1222

    
1223
        user_data = {
1224
            'email': 'kpap@synnefo.org',
1225
            'first_name': 'Kostas Papas',
1226
            'password1': '123',
1227
            'password2': '123'
1228
        }
1229
        form = backend.get_signup_form(provider='local',
1230
                                       initial_data=user_data)
1231
        user = form.save(commit=False)
1232
        form.store_user(user)
1233
        self.assertEqual(user.is_active, False)
1234
        self.assertEqual(user.email_verified, False)
1235

    
1236
        # step one, registration
1237
        result = backend.handle_registration(user)
1238
        user = AstakosUser.objects.get()
1239
        self.assertEqual(user.is_active, False)
1240
        self.assertEqual(user.email_verified, False)
1241
        self.assertTrue(user.verification_code)
1242
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1243
        backend.send_result_notifications(result, user)
1244
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1245
        self.assertEqual(len(mail.outbox), 1)
1246

    
1247
        # step two, verifying email
1248
        user = AstakosUser.objects.get()
1249
        valid_code = user.verification_code
1250
        invalid_code = user.verification_code + 'invalid'
1251

    
1252
        # test invalid code
1253
        result = backend.handle_verification(user, invalid_code)
1254
        self.assertEqual(result.status, backend.Result.ERROR)
1255
        backend.send_result_notifications(result, user)
1256
        user = AstakosUser.objects.get()
1257
        self.assertEqual(user.is_active, False)
1258
        self.assertEqual(user.moderated, False)
1259
        self.assertEqual(user.moderated_at, None)
1260
        self.assertEqual(user.email_verified, False)
1261
        self.assertTrue(user.activation_sent)
1262

    
1263
        # test valid code
1264
        user = AstakosUser.objects.get()
1265
        result = backend.handle_verification(user, valid_code)
1266
        backend.send_result_notifications(result, user)
1267
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1268
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1269
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1270
        self.assertEqual(len(mail.outbox), 2)
1271
        user = AstakosUser.objects.get()
1272
        self.assertEqual(user.moderated, False)
1273
        self.assertEqual(user.moderated_at, None)
1274
        self.assertEqual(user.email_verified, True)
1275
        self.assertTrue(user.activation_sent)
1276

    
1277
        # test code reuse
1278
        result = backend.handle_verification(user, valid_code)
1279
        self.assertEqual(result.status, backend.Result.ERROR)
1280
        user = AstakosUser.objects.get()
1281
        self.assertEqual(user.is_active, False)
1282
        self.assertEqual(user.moderated, False)
1283
        self.assertEqual(user.moderated_at, None)
1284
        self.assertEqual(user.email_verified, True)
1285
        self.assertTrue(user.activation_sent)
1286

    
1287
        # valid code on verified user
1288
        user = AstakosUser.objects.get()
1289
        valid_code = user.verification_code
1290
        result = backend.handle_verification(user, valid_code)
1291
        self.assertEqual(result.status, backend.Result.ERROR)
1292

    
1293
        # step three, moderation user
1294
        user = AstakosUser.objects.get()
1295
        result = backend.handle_moderation(user)
1296
        backend.send_result_notifications(result, user)
1297

    
1298
        user = AstakosUser.objects.get()
1299
        self.assertEqual(user.is_active, True)
1300
        self.assertEqual(user.moderated, True)
1301
        self.assertTrue(user.moderated_at)
1302
        self.assertEqual(user.email_verified, True)
1303
        self.assertTrue(user.activation_sent)
1304

    
1305

    
1306
class TestWebloginRedirect(TestCase):
1307

    
1308
    @with_settings(settings, COOKIE_DOMAIN='.astakos.synnefo.org')
1309
    def test_restricts_domains(self):
1310
        get_local_user('user1@synnefo.org')
1311

    
1312
        # next url construct helpers
1313
        weblogin = lambda nxt: reverse('weblogin') + '?next=%s' % nxt
1314
        weblogin_quoted = lambda nxt: reverse('weblogin') + '?next=%s' % \
1315
            urllib.quote_plus(nxt)
1316

    
1317
        # common cases
1318
        invalid_domain = weblogin("https://www.invaliddomain.synnefo.org")
1319
        invalid_scheme = weblogin("customscheme://localhost")
1320
        invalid_scheme_with_valid_domain = \
1321
                weblogin("http://www.invaliddomain.com")
1322
        valid_scheme = weblogin("pithos://localhost/")
1323
        # to be used in assertRedirects
1324
        valid_scheme_quoted = weblogin_quoted("pithos://localhost/")
1325

    
1326
        # not authenticated, redirects to login which contains next param with
1327
        # additional nested quoted next params
1328
        r = self.client.get(valid_scheme, follow=True)
1329
        login_redirect = reverse('login') + '?next=' + \
1330
            urllib.quote_plus("http://testserver" + valid_scheme_quoted)
1331
        self.assertRedirects(r, login_redirect)
1332

    
1333
        # authenticate client
1334
        self.client.login(username="user1@synnefo.org", password="password")
1335

    
1336
        # valid scheme
1337
        r = self.client.get(valid_scheme, follow=True)
1338
        url = r.redirect_chain[1][0]
1339
        # scheme preserved
1340
        self.assertTrue(url.startswith('pithos://localhost/'))
1341
        # redirect contains token param
1342
        params = urlparse.urlparse(url.replace('pithos', 'https'),
1343
                                   scheme='https').query
1344
        params = urlparse.parse_qs(params)
1345
        self.assertEqual(params['token'][0],
1346
                         AstakosUser.objects.get().auth_token)
1347
        # does not contain uuid
1348
        # reverted for 0.14.2 to support old pithos desktop clients
1349
        #self.assertFalse('uuid' in params)
1350

    
1351
        # invalid cases
1352
        r = self.client.get(invalid_scheme, follow=True)
1353
        self.assertEqual(r.status_code, 403)
1354

    
1355
        r = self.client.get(invalid_scheme_with_valid_domain, follow=True)
1356
        self.assertEqual(r.status_code, 403)
1357

    
1358
        r = self.client.get(invalid_domain, follow=True)
1359
        self.assertEqual(r.status_code, 403)