Revision 1ac3349d

/dev/null
1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

  
35
import copy
36
import datetime
37
import functools
38

  
39
from snf_django.utils.testing import with_settings, override_settings, assertIn
40

  
41
from django.test import Client
42
from django.test import TransactionTestCase as TestCase
43
from django.core import mail
44
from django.http import SimpleCookie, HttpRequest, QueryDict
45
from django.utils.importlib import import_module
46
from django.utils import simplejson as json
47

  
48
from astakos.im.activation_backends import *
49
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
50
from astakos.im.models import *
51
from astakos.im import functions
52
from astakos.im import settings as astakos_settings
53
from astakos.im import forms
54
from astakos.im import activation_backends
55

  
56
from urllib import quote
57
from datetime import timedelta
58

  
59
from astakos.im import messages
60
from astakos.im import auth_providers
61
from astakos.im import quotas
62
from astakos.im import resources
63

  
64
from django.conf import settings
65

  
66

  
67
# set some common settings
68
astakos_settings.EMAILCHANGE_ENABLED = True
69
astakos_settings.RECAPTCHA_ENABLED = False
70

  
71
settings.LOGGING_SETUP['disable_existing_loggers'] = False
72

  
73
# shortcut decorators to override provider settings
74
# e.g. shibboleth_settings(ENABLED=True) will set
75
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
76
prefixes = {'providers': 'AUTH_PROVIDER_',
77
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
78
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
79
im_settings = functools.partial(with_settings, astakos_settings)
80
shibboleth_settings = functools.partial(with_settings,
81
                                        settings,
82
                                        prefix=prefixes['shibboleth'])
83
localauth_settings = functools.partial(with_settings, settings,
84
                                       prefix=prefixes['local'])
85

  
86

  
87
class AstakosTestClient(Client):
88
    pass
89

  
90

  
91
class ShibbolethClient(AstakosTestClient):
92
    """
93
    A shibboleth agnostic client.
94
    """
95
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
96
                          dir(ShibbolethTokens))
97

  
98
    def __init__(self, *args, **kwargs):
99
        self.tokens = kwargs.pop('tokens', {})
100
        super(ShibbolethClient, self).__init__(*args, **kwargs)
101

  
102
    def set_tokens(self, **kwargs):
103
        for key, value in kwargs.iteritems():
104
            key = 'SHIB_%s' % key.upper()
105
            if not key in self.VALID_TOKENS:
106
                raise Exception('Invalid shibboleth token')
107

  
108
            self.tokens[key] = value
109

  
110
    def unset_tokens(self, *keys):
111
        for key in keys:
112
            key = 'SHIB_%s' % param.upper()
113
            if key in self.tokens:
114
                del self.tokens[key]
115

  
116
    def reset_tokens(self):
117
        self.tokens = {}
118

  
119
    def get_http_token(self, key):
120
        http_header = getattr(ShibbolethTokens, key)
121
        return http_header
122

  
123
    def request(self, **request):
124
        """
125
        Transform valid shibboleth tokens to http headers
126
        """
127
        for token, value in self.tokens.iteritems():
128
            request[self.get_http_token(token)] = value
129

  
130
        for param in request.keys():
131
            key = 'SHIB_%s' % param.upper()
132
            if key in self.VALID_TOKENS:
133
                request[self.get_http_token(key)] = request[param]
134
                del request[param]
135

  
136
        return super(ShibbolethClient, self).request(**request)
137

  
138

  
139
def get_user_client(username, password="password"):
140
    client = Client()
141
    client.login(username=username, password=password)
142
    return client
143

  
144

  
145
def get_local_user(username, **kwargs):
146
        try:
147
            return AstakosUser.objects.get(email=username)
148
        except:
149
            user_params = {
150
                'username': username,
151
                'email': username,
152
                'is_active': True,
153
                'activation_sent': datetime.now(),
154
                'email_verified': True
155
            }
156
            user_params.update(kwargs)
157
            user = AstakosUser(**user_params)
158
            user.set_password(kwargs.get('password', 'password'))
159
            user.renew_verification_code()
160
            user.save()
161
            user.add_auth_provider('local', auth_backend='astakos')
162
            if kwargs.get('is_active', True):
163
                user.is_active = True
164
            else:
165
                user.is_active = False
166
            user.save()
167
            return user
168

  
169

  
170
def get_mailbox(email):
171
    mails = []
172
    for sent_email in mail.outbox:
173
        for recipient in sent_email.recipients():
174
            if email in recipient:
175
                mails.append(sent_email)
176
    return mails
177

  
178

  
179
class ShibbolethTests(TestCase):
180
    """
181
    Testing shibboleth authentication.
182
    """
183

  
184
    fixtures = ['groups']
185

  
186
    def setUp(self):
187
        self.client = ShibbolethClient()
188
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
189
        astakos_settings.MODERATION_ENABLED = True
190

  
191
    @im_settings(FORCE_PROFILE_UPDATE=False)
192
    def test_create_account(self):
193

  
194
        client = ShibbolethClient()
195

  
196
        # shibboleth views validation
197
        # eepn required
198
        r = client.get('/im/login/shibboleth?', follow=True)
199
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
200
            'domain': astakos_settings.BASEURL,
201
            'contact_email': settings.CONTACT_EMAIL
202
        })
203
        client.set_tokens(eppn="kpapeppn")
204

  
205
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
206
        # shibboleth user info required
207
        r = client.get('/im/login/shibboleth?', follow=True)
208
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
209
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
210

  
211
        # shibboleth logged us in
212
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
213
                          cn="Kostas Papadimitriou",
214
                          ep_affiliation="Test Affiliation")
215
        r = client.get('/im/login/shibboleth?', follow=True)
216
        token = PendingThirdPartyUser.objects.get().token
217
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
218
        self.assertEqual(r.status_code, 200)
219

  
220
        # a new pending user created
221
        pending_user = PendingThirdPartyUser.objects.get(
222
            third_party_identifier="kpapeppn")
223
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
224
        # keep the token for future use
225
        token = pending_user.token
226
        # from now on no shibboleth headers are sent to the server
227
        client.reset_tokens()
228

  
229
        # this is the old way, it should fail, to avoid pending user take over
230
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
231
        self.assertEqual(r.status_code, 404)
232

  
233
        # this is the signup unique url associated with the pending user
234
        # created
235
        r = client.get('/im/signup/?third_party_token=%s' % token)
236
        identifier = pending_user.third_party_identifier
237
        post_data = {'third_party_identifier': identifier,
238
                     'first_name': 'Kostas',
239
                     'third_party_token': token,
240
                     'last_name': 'Mitroglou',
241
                     'provider': 'shibboleth'}
242

  
243
        signup_url = reverse('signup')
244

  
245
        # invlid email
246
        post_data['email'] = 'kpap'
247
        r = client.post(signup_url, post_data)
248
        self.assertContains(r, token)
249

  
250
        # existing email
251
        existing_user = get_local_user('test@test.com')
252
        post_data['email'] = 'test@test.com'
253
        r = client.post(signup_url, post_data)
254
        self.assertContains(r, messages.EMAIL_USED)
255
        existing_user.delete()
256

  
257
        # and finally a valid signup
258
        post_data['email'] = 'kpap@synnefo.org'
259
        r = client.post(signup_url, post_data, follow=True)
260
        self.assertContains(r, messages.VERIFICATION_SENT)
261

  
262
        # entires commited as expected
263
        self.assertEqual(AstakosUser.objects.count(), 1)
264
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
265
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
266

  
267
        # provider info stored
268
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
269
        self.assertEqual(provider.affiliation, 'Test Affiliation')
270
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
271
                                         u'eppn': u'kpapeppn',
272
                                         u'name': u'Kostas Papadimitriou'})
273

  
274
        # login (not activated yet)
275
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
276
                          cn="Kostas Papadimitriou", )
277
        r = client.get("/im/login/shibboleth?", follow=True)
278
        self.assertContains(r, 'is pending moderation')
279

  
280
        # admin activates the user
281
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
282
        backend = activation_backends.get_backend()
283
        activation_result = backend.verify_user(u, u.verification_code)
284
        activation_result = backend.accept_user(u)
285
        self.assertFalse(activation_result.is_error())
286
        backend.send_result_notifications(activation_result, u)
287
        self.assertEqual(u.is_active, True)
288

  
289
        # we see our profile
290
        r = client.get("/im/login/shibboleth?", follow=True)
291
        self.assertRedirects(r, '/im/landing')
292
        self.assertEqual(r.status_code, 200)
293

  
294
    def test_existing(self):
295
        """
296
        Test adding of third party login to an existing account
297
        """
298

  
299
        # this is our existing user
300
        existing_user = get_local_user('kpap@synnefo.org')
301
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
302
        existing_inactive.is_active = False
303
        existing_inactive.save()
304

  
305
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
306
        existing_unverified.is_active = False
307
        existing_unverified.activation_sent = None
308
        existing_unverified.email_verified = False
309
        existing_unverified.is_verified = False
310
        existing_unverified.save()
311

  
312
        client = ShibbolethClient()
313
        # shibboleth logged us in, notice that we use different email
314
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
315
                          cn="Kostas Papadimitriou", )
316
        r = client.get("/im/login/shibboleth?", follow=True)
317

  
318
        # a new pending user created
319
        pending_user = PendingThirdPartyUser.objects.get()
320
        token = pending_user.token
321
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
322
        pending_key = pending_user.token
323
        client.reset_tokens()
324
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
325

  
326
        form = r.context['form']
327
        signupdata = copy.copy(form.initial)
328
        signupdata['email'] = 'kpap@synnefo.org'
329
        signupdata['third_party_token'] = token
330
        signupdata['provider'] = 'shibboleth'
331
        signupdata.pop('id', None)
332

  
333
        # the email exists to another user
334
        r = client.post("/im/signup", signupdata)
335
        self.assertContains(r, "There is already an account with this email "
336
                               "address")
337
        # change the case, still cannot create
338
        signupdata['email'] = 'KPAP@synnefo.org'
339
        r = client.post("/im/signup", signupdata)
340
        self.assertContains(r, "There is already an account with this email "
341
                               "address")
342
        # inactive user
343
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
344
        r = client.post("/im/signup", signupdata)
345
        self.assertContains(r, "There is already an account with this email "
346
                               "address")
347

  
348
        # unverified user, this should pass, old entry will be deleted
349
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
350
        r = client.post("/im/signup", signupdata)
351

  
352
        post_data = {'password': 'password',
353
                     'username': 'kpap@synnefo.org'}
354
        r = client.post('/im/local', post_data, follow=True)
355
        self.assertTrue(r.context['request'].user.is_authenticated())
356
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
357
                          cn="Kostas Papadimitriou", )
358
        r = client.get("/im/login/shibboleth?", follow=True)
359
        self.assertContains(r, "enabled for this account")
360
        client.reset_tokens()
361

  
362
        user = existing_user
363
        self.assertTrue(user.has_auth_provider('shibboleth'))
364
        self.assertTrue(user.has_auth_provider('local',
365
                                               auth_backend='astakos'))
366
        client.logout()
367

  
368
        # look Ma, i can login with both my shibboleth and local account
369
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
370
                          cn="Kostas Papadimitriou")
371
        r = client.get("/im/login/shibboleth?", follow=True)
372
        self.assertTrue(r.context['request'].user.is_authenticated())
373
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
374
        self.assertRedirects(r, '/im/landing')
375
        self.assertEqual(r.status_code, 200)
376
        client.logout()
377
        client.reset_tokens()
378

  
379
        # logged out
380
        r = client.get("/im/profile", follow=True)
381
        self.assertFalse(r.context['request'].user.is_authenticated())
382

  
383
        # login with local account also works
384
        post_data = {'password': 'password',
385
                     'username': 'kpap@synnefo.org'}
386
        r = self.client.post('/im/local', post_data, follow=True)
387
        self.assertTrue(r.context['request'].user.is_authenticated())
388
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
389
        self.assertRedirects(r, '/im/landing')
390
        self.assertEqual(r.status_code, 200)
391

  
392
        # cannot add the same eppn
393
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
394
                          cn="Kostas Papadimitriou", )
395
        r = client.get("/im/login/shibboleth?", follow=True)
396
        self.assertRedirects(r, '/im/landing')
397
        self.assertTrue(r.status_code, 200)
398
        self.assertEquals(existing_user.auth_providers.count(), 2)
399

  
400
        # only one allowed by default
401
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
402
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
403
        prov = auth_providers.get_provider('shibboleth')
404
        r = client.get("/im/login/shibboleth?", follow=True)
405
        self.assertContains(r, "Failed to add")
406
        self.assertRedirects(r, '/im/profile')
407
        self.assertTrue(r.status_code, 200)
408
        self.assertEquals(existing_user.auth_providers.count(), 2)
409
        client.logout()
410
        client.reset_tokens()
411

  
412
        # cannot login with another eppn
413
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
414
                          cn="Kostas Papadimitriou")
415
        r = client.get("/im/login/shibboleth?", follow=True)
416
        self.assertFalse(r.context['request'].user.is_authenticated())
417

  
418
        # cannot
419

  
420
        # lets remove local password
421
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
422
                                       email="kpap@synnefo.org")
423
        remove_local_url = user.get_auth_provider('local').get_remove_url
424
        remove_shibbo_url = user.get_auth_provider('shibboleth',
425
                                                   'kpapeppn').get_remove_url
426
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
427
                          cn="Kostas Papadimtriou")
428
        r = client.get("/im/login/shibboleth?", follow=True)
429
        client.reset_tokens()
430

  
431
        # TODO: this view should use POST
432
        r = client.get(remove_local_url)
433
        # 2 providers left
434
        self.assertEqual(user.auth_providers.count(), 1)
435
        # cannot remove last provider
436
        r = client.get(remove_shibbo_url)
437
        self.assertEqual(r.status_code, 403)
438
        self.client.logout()
439

  
440
        # cannot login using local credentials (notice we use another client)
441
        post_data = {'password': 'password',
442
                     'username': 'kpap@synnefo.org'}
443
        r = self.client.post('/im/local', post_data, follow=True)
444
        self.assertFalse(r.context['request'].user.is_authenticated())
445

  
446
        # we can reenable the local provider by setting a password
447
        r = client.get("/im/password_change", follow=True)
448
        r = client.post("/im/password_change", {'new_password1': '111',
449
                                                'new_password2': '111'},
450
                        follow=True)
451
        user = r.context['request'].user
452
        self.assertTrue(user.has_auth_provider('local'))
453
        self.assertTrue(user.has_auth_provider('shibboleth'))
454
        self.assertTrue(user.check_password('111'))
455
        self.assertTrue(user.has_usable_password())
456
        self.client.logout()
457

  
458
        # now we can login
459
        post_data = {'password': '111',
460
                     'username': 'kpap@synnefo.org'}
461
        r = self.client.post('/im/local', post_data, follow=True)
462
        self.assertTrue(r.context['request'].user.is_authenticated())
463

  
464
        client.reset_tokens()
465

  
466
        # we cannot take over another shibboleth identifier
467
        user2 = get_local_user('another@synnefo.org')
468
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
469
        # login
470
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
471
                          cn="Kostas Papadimitriou")
472
        r = client.get("/im/login/shibboleth?", follow=True)
473
        # try to assign existing shibboleth identifier of another user
474
        client.set_tokens(mail="kpap_second@shibboleth.gr",
475
                          eppn="existingeppn", cn="Kostas Papadimitriou")
476
        r = client.get("/im/login/shibboleth?", follow=True)
477
        self.assertContains(r, "this account is already assigned")
478

  
479

  
480
class TestLocal(TestCase):
481

  
482
    fixtures = ['groups']
483

  
484
    def setUp(self):
485
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
486
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
487
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
488
        settings.ASTAKOS_MODERATION_ENABLED = True
489

  
490
    def tearDown(self):
491
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
492

  
493
    def test_no_moderation(self):
494
        # disable moderation
495
        astakos_settings.MODERATION_ENABLED = False
496

  
497
        # create a new user
498
        r = self.client.get("/im/signup")
499
        self.assertEqual(r.status_code, 200)
500
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
501
                'password2': 'password', 'first_name': 'Kostas',
502
                'last_name': 'Mitroglou', 'provider': 'local'}
503
        r = self.client.post("/im/signup", data)
504

  
505
        # user created
506
        self.assertEqual(AstakosUser.objects.count(), 1)
507
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
508
                                       email="kpap@synnefo.org")
509
        self.assertEqual(user.username, 'kpap@synnefo.org')
510
        self.assertEqual(user.has_auth_provider('local'), True)
511
        self.assertFalse(user.is_active)
512

  
513
        # user (but not admin) gets notified
514
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
515
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
516
        astakos_settings.MODERATION_ENABLED = True
517

  
518
    def test_email_case(self):
519
        data = {
520
            'email': 'kPap@synnefo.org',
521
            'password1': '1234',
522
            'password2': '1234'
523
        }
524

  
525
        form = forms.LocalUserCreationForm(data)
526
        self.assertTrue(form.is_valid())
527
        user = form.save()
528
        form.store_user(user, {})
529

  
530
        u = AstakosUser.objects.get()
531
        self.assertEqual(u.email, 'kPap@synnefo.org')
532
        self.assertEqual(u.username, 'kpap@synnefo.org')
533
        u.is_active = True
534
        u.email_verified = True
535
        u.save()
536

  
537
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
538
        login = forms.LoginForm(data=data)
539
        self.assertTrue(login.is_valid())
540

  
541
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
542
        login = forms.LoginForm(data=data)
543
        self.assertTrue(login.is_valid())
544

  
545
        data = {
546
            'email': 'kpap@synnefo.org',
547
            'password1': '1234',
548
            'password2': '1234'
549
        }
550
        form = forms.LocalUserCreationForm(data)
551
        self.assertFalse(form.is_valid())
552

  
553
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
554
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
555
    def test_local_provider(self):
556
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
557

  
558
        # create a user
559
        r = self.client.get("/im/signup")
560
        self.assertEqual(r.status_code, 200)
561
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
562
                'password2': 'password', 'first_name': 'Kostas',
563
                'last_name': 'Mitroglou', 'provider': 'local'}
564
        r = self.client.post("/im/signup", data)
565

  
566
        # user created
567
        self.assertEqual(AstakosUser.objects.count(), 1)
568
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
569
                                       email="kpap@synnefo.org")
570
        self.assertEqual(user.username, 'kpap@synnefo.org')
571
        self.assertEqual(user.has_auth_provider('local'), True)
572
        self.assertFalse(user.is_active)  # not activated
573
        self.assertFalse(user.email_verified)  # not verified
574
        self.assertTrue(user.activation_sent)  # activation automatically sent
575
        self.assertFalse(user.moderated)
576
        self.assertFalse(user.email_verified)
577

  
578
        # admin gets notified and activates the user from the command line
579
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
580
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
581
                                           'password': 'password'},
582
                             follow=True)
583
        self.assertContains(r, messages.VERIFICATION_SENT)
584
        backend = activation_backends.get_backend()
585

  
586
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
587
        backend.send_user_verification_email(user)
588

  
589
        # user activation fields updated and user gets notified via email
590
        user = AstakosUser.objects.get(pk=user.pk)
591
        self.assertTrue(user.activation_sent)
592
        self.assertFalse(user.email_verified)
593
        self.assertFalse(user.is_active)
594
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
595

  
596
        # user forgot she got registered and tries to submit registration
597
        # form. Notice the upper case in email
598
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
599
                'password2': 'password', 'first_name': 'Kostas',
600
                'last_name': 'Mitroglou', 'provider': 'local'}
601
        r = self.client.post("/im/signup", data, follow=True)
602
        self.assertRedirects(r, reverse('index'))
603
        self.assertContains(r, messages.VERIFICATION_SENT)
604

  
605
        user = AstakosUser.objects.get()
606
        # previous user replaced
607
        self.assertTrue(user.activation_sent)
608
        self.assertFalse(user.email_verified)
609
        self.assertFalse(user.is_active)
610
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
611

  
612
        # hmmm, email exists; lets request a password change
613
        r = self.client.get('/im/local/password_reset')
614
        self.assertEqual(r.status_code, 200)
615
        data = {'email': 'kpap@synnefo.org'}
616
        r = self.client.post('/im/local/password_reset', data, follow=True)
617
        # she can't because account is not active yet
618
        self.assertContains(r, 'pending activation')
619

  
620
        # moderation is enabled and an activation email has already been sent
621
        # so user can trigger resend of the activation email
622
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
623
        self.assertContains(r, 'has been sent to your email address.')
624
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
625

  
626
        # also she cannot login
627
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
628
        r = self.client.post('/im/local', data, follow=True)
629
        self.assertContains(r, 'Resend activation')
630
        self.assertFalse(r.context['request'].user.is_authenticated())
631
        self.assertFalse('_pithos2_a' in self.client.cookies)
632

  
633
        # user sees the message and resends activation
634
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
635
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
636

  
637
        # logged in user cannot activate another account
638
        tmp_user = get_local_user("test_existing_user@synnefo.org")
639
        tmp_client = Client()
640
        tmp_client.login(username="test_existing_user@synnefo.org",
641
                         password="password")
642
        r = tmp_client.get(user.get_activation_url(), follow=True)
643
        self.assertContains(r, messages.LOGGED_IN_WARNING)
644

  
645
        r = self.client.get(user.get_activation_url(), follow=True)
646
        # previous code got invalidated
647
        self.assertEqual(r.status_code, 404)
648

  
649
        user = AstakosUser.objects.get(pk=user.pk)
650
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
651
        r = self.client.get(user.get_activation_url(), follow=True)
652
        self.assertRedirects(r, reverse('index'))
653
        # user sees that account is pending approval from admins
654
        self.assertContains(r, messages.NOTIFICATION_SENT)
655
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
656

  
657
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
658
        result = backend.handle_moderation(user)
659
        backend.send_result_notifications(result, user)
660
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
661
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
662

  
663
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
664
        r = self.client.get('/im/profile', follow=True)
665
        self.assertFalse(r.context['request'].user.is_authenticated())
666
        self.assertFalse('_pithos2_a' in self.client.cookies)
667
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
668

  
669
        user = AstakosUser.objects.get(pk=user.pk)
670
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
671
                                           'password': 'password'},
672
                             follow=True)
673
        # user activated and logged in, token cookie set
674
        self.assertTrue(r.context['request'].user.is_authenticated())
675
        self.assertTrue('_pithos2_a' in self.client.cookies)
676
        cookies = self.client.cookies
677
        self.assertTrue(quote(user.auth_token) in
678
                        cookies.get('_pithos2_a').value)
679
        r = self.client.get('/im/logout', follow=True)
680
        r = self.client.get('/im/')
681
        # user logged out, token cookie removed
682
        self.assertFalse(r.context['request'].user.is_authenticated())
683
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
684

  
685
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
686
        del self.client.cookies['_pithos2_a']
687

  
688
        # user can login
689
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
690
                                           'password': 'password'},
691
                             follow=True)
692
        self.assertTrue(r.context['request'].user.is_authenticated())
693
        self.assertTrue('_pithos2_a' in self.client.cookies)
694
        cookies = self.client.cookies
695
        self.assertTrue(quote(user.auth_token) in
696
                        cookies.get('_pithos2_a').value)
697
        self.client.get('/im/logout', follow=True)
698

  
699
        # user forgot password
700
        old_pass = user.password
701
        r = self.client.get('/im/local/password_reset')
702
        self.assertEqual(r.status_code, 200)
703
        r = self.client.post('/im/local/password_reset', {'email':
704
                                                          'kpap@synnefo.org'})
705
        self.assertEqual(r.status_code, 302)
706
        # email sent
707
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
708

  
709
        # user visits change password link
710
        user = AstakosUser.objects.get(pk=user.pk)
711
        r = self.client.get(user.get_password_reset_url())
712
        r = self.client.post(user.get_password_reset_url(),
713
                             {'new_password1': 'newpass',
714
                              'new_password2': 'newpass'})
715

  
716
        user = AstakosUser.objects.get(pk=user.pk)
717
        self.assertNotEqual(old_pass, user.password)
718

  
719
        # old pass is not usable
720
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
721
                                           'password': 'password'})
722
        self.assertContains(r, 'Please enter a correct username and password')
723
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
724
                                           'password': 'newpass'},
725
                             follow=True)
726
        self.assertTrue(r.context['request'].user.is_authenticated())
727
        self.client.logout()
728

  
729
        # tests of special local backends
730
        user = AstakosUser.objects.get(pk=user.pk)
731
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
732
        user.save()
733

  
734
        # non astakos local backends do not support password reset
735
        r = self.client.get('/im/local/password_reset')
736
        self.assertEqual(r.status_code, 200)
737
        r = self.client.post('/im/local/password_reset', {'email':
738
                                                          'kpap@synnefo.org'})
739
        # she can't because account is not active yet
740
        self.assertContains(r, "Changing password is not")
741

  
742

  
743
class UserActionsTests(TestCase):
744

  
745
    def test_email_change(self):
746
        # to test existing email validation
747
        get_local_user('existing@synnefo.org')
748

  
749
        # local user
750
        user = get_local_user('kpap@synnefo.org')
751

  
752
        # login as kpap
753
        self.client.login(username='kpap@synnefo.org', password='password')
754
        r = self.client.get('/im/profile', follow=True)
755
        user = r.context['request'].user
756
        self.assertTrue(user.is_authenticated())
757

  
758
        # change email is enabled
759
        r = self.client.get('/im/email_change')
760
        self.assertEqual(r.status_code, 200)
761
        self.assertFalse(user.email_change_is_pending())
762

  
763
        # request email change to an existing email fails
764
        data = {'new_email_address': 'existing@synnefo.org'}
765
        r = self.client.post('/im/email_change', data)
766
        self.assertContains(r, messages.EMAIL_USED)
767

  
768
        # proper email change
769
        data = {'new_email_address': 'kpap@gmail.com'}
770
        r = self.client.post('/im/email_change', data, follow=True)
771
        self.assertRedirects(r, '/im/profile')
772
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
773
        change1 = EmailChange.objects.get()
774

  
775
        # user sees a warning
776
        r = self.client.get('/im/email_change')
777
        self.assertEqual(r.status_code, 200)
778
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
779
        self.assertTrue(user.email_change_is_pending())
780

  
781
        # link was sent
782
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
783
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
784

  
785
        # proper email change
786
        data = {'new_email_address': 'kpap@yahoo.com'}
787
        r = self.client.post('/im/email_change', data, follow=True)
788
        self.assertRedirects(r, '/im/profile')
789
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
790
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
791
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
792
        change2 = EmailChange.objects.get()
793

  
794
        r = self.client.get(change1.get_url())
795
        self.assertEquals(r.status_code, 302)
796
        self.client.logout()
797

  
798
        r = self.client.post('/im/local?next=' + change2.get_url(),
799
                             {'username': 'kpap@synnefo.org',
800
                              'password': 'password',
801
                              'next': change2.get_url()},
802
                             follow=True)
803
        self.assertRedirects(r, '/im/profile')
804
        user = r.context['request'].user
805
        self.assertEquals(user.email, 'kpap@yahoo.com')
806
        self.assertEquals(user.username, 'kpap@yahoo.com')
807

  
808
        self.client.logout()
809
        r = self.client.post('/im/local?next=' + change2.get_url(),
810
                             {'username': 'kpap@synnefo.org',
811
                              'password': 'password',
812
                              'next': change2.get_url()},
813
                             follow=True)
814
        self.assertContains(r, "Please enter a correct username and password")
815
        self.assertEqual(user.emailchanges.count(), 0)
816

  
817

  
818
class TestAuthProviderViews(TestCase):
819

  
820
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
821
                         AUTOMODERATE_POLICY=True)
822
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
823
                 FORCE_PROFILE_UPDATE=False)
824
    def test_user(self):
825
        Profile = AuthProviderPolicyProfile
826
        Pending = PendingThirdPartyUser
827
        User = AstakosUser
828

  
829
        User.objects.create(email="newuser@synnefo.org")
830
        get_local_user("olduser@synnefo.org")
831
        cl_olduser = ShibbolethClient()
832
        get_local_user("olduser2@synnefo.org")
833
        ShibbolethClient()
834
        cl_newuser = ShibbolethClient()
835
        cl_newuser2 = Client()
836

  
837
        academic_group, created = Group.objects.get_or_create(
838
            name='academic-login')
839
        academic_users = academic_group.user_set
840
        assert created
841
        policy_only_academic = Profile.objects.add_policy('academic_strict',
842
                                                          'shibboleth',
843
                                                          academic_group,
844
                                                          exclusive=True,
845
                                                          login=False,
846
                                                          add=False)
847

  
848
        # new academic user
849
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
850
        cl_newuser.set_tokens(eppn="newusereppn")
851
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
852
        pending = Pending.objects.get()
853
        identifier = pending.third_party_identifier
854
        signup_data = {'third_party_identifier': identifier,
855
                       'first_name': 'Academic',
856
                       'third_party_token': pending.token,
857
                       'last_name': 'New User',
858
                       'provider': 'shibboleth'}
859
        r = cl_newuser.post('/im/signup', signup_data)
860
        self.assertContains(r, "This field is required", )
861
        signup_data['email'] = 'olduser@synnefo.org'
862
        r = cl_newuser.post('/im/signup', signup_data)
863
        self.assertContains(r, "already an account with this email", )
864
        signup_data['email'] = 'newuser@synnefo.org'
865
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
866
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
867
        self.assertEqual(r.status_code, 404)
868
        newuser = User.objects.get(email="newuser@synnefo.org")
869
        activation_link = newuser.get_activation_url()
870
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
871

  
872
        # new non-academic user
873
        signup_data = {'first_name': 'Non Academic',
874
                       'last_name': 'New User',
875
                       'provider': 'local',
876
                       'password1': 'password',
877
                       'password2': 'password'}
878
        signup_data['email'] = 'olduser@synnefo.org'
879
        r = cl_newuser2.post('/im/signup', signup_data)
880
        self.assertContains(r, 'There is already an account with this '
881
                               'email address')
882
        signup_data['email'] = 'newuser@synnefo.org'
883
        r = cl_newuser2.post('/im/signup/', signup_data)
884
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
885
        r = self.client.get(activation_link, follow=True)
886
        self.assertEqual(r.status_code, 404)
887
        newuser = User.objects.get(email="newuser@synnefo.org")
888
        self.assertTrue(newuser.activation_sent)
889

  
890
        # activation sent, user didn't open verification url so additional
891
        # registrations invalidate the previous signups.
892
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
893
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
894
        pending = Pending.objects.get()
895
        identifier = pending.third_party_identifier
896
        signup_data = {'third_party_identifier': identifier,
897
                       'first_name': 'Academic',
898
                       'third_party_token': pending.token,
899
                       'last_name': 'New User',
900
                       'provider': 'shibboleth'}
901
        signup_data['email'] = 'newuser@synnefo.org'
902
        r = cl_newuser.post('/im/signup', signup_data)
903
        self.assertEqual(r.status_code, 302)
904
        newuser = User.objects.get(email="newuser@synnefo.org")
905
        self.assertTrue(newuser.activation_sent)
906
        activation_link = newuser.get_activation_url()
907
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
908
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
909
        self.assertRedirects(r, '/im/landing')
910
        newuser = User.objects.get(email="newuser@synnefo.org")
911
        self.assertEqual(newuser.is_active, True)
912
        self.assertEqual(newuser.email_verified, True)
913
        cl_newuser.logout()
914

  
915
        # cannot reactivate if suspended
916
        newuser.is_active = False
917
        newuser.save()
918
        r = cl_newuser.get(newuser.get_activation_url())
919
        newuser = User.objects.get(email="newuser@synnefo.org")
920
        self.assertFalse(newuser.is_active)
921

  
922
        # release suspension
923
        newuser.is_active = True
924
        newuser.save()
925

  
926
        cl_newuser.get('/im/login/shibboleth?', follow=True)
927
        local = auth.get_provider('local', newuser)
928
        self.assertEqual(local.get_add_policy, False)
929
        self.assertEqual(local.get_login_policy, False)
930
        r = cl_newuser.get(local.get_add_url, follow=True)
931
        self.assertRedirects(r, '/im/profile')
932
        self.assertContains(r, 'disabled for your')
933

  
934
        cl_olduser.login(username='olduser@synnefo.org', password="password")
935
        r = cl_olduser.get('/im/profile', follow=True)
936
        self.assertEqual(r.status_code, 200)
937
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
938
        self.assertContains(r, 'Your request is missing a unique token')
939
        cl_olduser.set_tokens(eppn="newusereppn")
940
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
941
        self.assertContains(r, 'is already assigned to another user')
942
        cl_olduser.set_tokens(eppn="oldusereppn")
943
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
944
        self.assertContains(r, 'Academic login enabled for this account')
945

  
946
        user = User.objects.get(email="olduser@synnefo.org")
947
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
948
        local_provider = user.get_auth_provider('local')
949
        self.assertEqual(shib_provider.get_remove_policy, True)
950
        self.assertEqual(local_provider.get_remove_policy, True)
951

  
952
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
953
                                                          'shibboleth',
954
                                                          academic_group,
955
                                                          remove=False)
956
        user.groups.add(academic_group)
957
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
958
        local_provider = user.get_auth_provider('local')
959
        self.assertEqual(shib_provider.get_remove_policy, False)
960
        self.assertEqual(local_provider.get_remove_policy, True)
961
        self.assertEqual(local_provider.get_login_policy, False)
962

  
963
        cl_olduser.logout()
964
        login_data = {'username': 'olduser@synnefo.org', 'password': 'password'}
965
        r = cl_olduser.post('/im/local', login_data, follow=True)
966
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
967
        Group.objects.all().delete()
968

  
969

  
970
class TestAuthProvidersAPI(TestCase):
971
    """
972
    Test auth_providers module API
973
    """
974

  
975
    @im_settings(IM_MODULES=['local', 'shibboleth'])
976
    def test_create(self):
977
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
978
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
979

  
980
        module = 'shibboleth'
981
        identifier = 'SHIB_UUID'
982
        provider_params = {
983
            'affiliation': 'UNIVERSITY',
984
            'info': {'age': 27}
985
        }
986
        provider = auth.get_provider(module, user2, identifier,
987
                                     **provider_params)
988
        provider.add_to_user()
989
        provider = auth.get_provider(module, user, identifier,
990
                                     **provider_params)
991
        provider.add_to_user()
992
        user.email_verified = True
993
        user.save()
994
        self.assertRaises(Exception, provider.add_to_user)
995
        provider = user.get_auth_provider(module, identifier)
996
        self.assertEqual(user.get_auth_provider(
997
            module, identifier)._instance.info.get('age'), 27)
998

  
999
        module = 'local'
1000
        identifier = None
1001
        provider_params = {'auth_backend': 'ldap', 'info':
1002
                          {'office': 'A1'}}
1003
        provider = auth.get_provider(module, user, identifier,
1004
                                     **provider_params)
1005
        provider.add_to_user()
1006
        self.assertFalse(provider.get_add_policy)
1007
        self.assertRaises(Exception, provider.add_to_user)
1008

  
1009
        shib = user.get_auth_provider('shibboleth',
1010
                                      'SHIB_UUID')
1011
        self.assertTrue(shib.get_remove_policy)
1012

  
1013
        local = user.get_auth_provider('local')
1014
        self.assertTrue(local.get_remove_policy)
1015

  
1016
        local.remove_from_user()
1017
        self.assertFalse(shib.get_remove_policy)
1018
        self.assertRaises(Exception, shib.remove_from_user)
1019

  
1020
        provider = user.get_auth_providers()[0]
1021
        self.assertRaises(Exception, provider.add_to_user)
1022

  
1023
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1024
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1025
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1026
                                                 'group2'])
1027
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1028
                        CREATION_GROUPS_POLICY=['localgroup-create',
1029
                                                'group-create'])
1030
    def test_add_groups(self):
1031
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1032
        provider = auth.get_provider('shibboleth', user, 'test123')
1033
        provider.add_to_user()
1034
        user = AstakosUser.objects.get()
1035
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1036
                              sorted([u'group1', u'group2', u'group-create']))
1037

  
1038
        local = auth.get_provider('local', user)
1039
        local.add_to_user()
1040
        provider = user.get_auth_provider('shibboleth')
1041
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1042
        provider.remove_from_user()
1043
        user = AstakosUser.objects.get()
1044
        self.assertEqual(len(user.get_auth_providers()), 1)
1045
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1046
                              sorted([u'group-create', u'localgroup']))
1047

  
1048
        local = user.get_auth_provider('local')
1049
        self.assertRaises(Exception, local.remove_from_user)
1050
        provider = auth.get_provider('shibboleth', user, 'test123')
1051
        provider.add_to_user()
1052
        user = AstakosUser.objects.get()
1053
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1054
                              sorted([u'group-create', u'group1', u'group2',
1055
                               u'localgroup']))
1056
        Group.objects.all().delete()
1057

  
1058

  
1059

  
1060
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1061
    def test_policies(self):
1062
        group_old, created = Group.objects.get_or_create(name='olduser')
1063

  
1064
        astakos_settings.MODERATION_ENABLED = True
1065
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1066
            ['academic-user']
1067
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1068
            ['google-user']
1069

  
1070
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1071
        user.groups.add(group_old)
1072
        user.add_auth_provider('local')
1073

  
1074
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
1075
        user2.add_auth_provider('shibboleth', identifier='shibid')
1076

  
1077
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
1078
        user3.groups.add(group_old)
1079
        user3.add_auth_provider('local')
1080
        user3.add_auth_provider('shibboleth', identifier='1234')
1081

  
1082
        self.assertTrue(user2.groups.get(name='academic-user'))
1083
        self.assertFalse(user2.groups.filter(name='olduser').count())
1084

  
1085
        local = auth_providers.get_provider('local')
1086
        self.assertTrue(local.get_add_policy)
1087

  
1088
        academic_group = Group.objects.get(name='academic-user')
1089
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1090
                                                     academic_group,
1091
                                                     exclusive=True,
1092
                                                     add=False,
1093
                                                     login=False)
1094
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1095
                                                     academic_group,
1096
                                                     exclusive=True,
1097
                                                     login=False,
1098
                                                     add=False)
1099
        # no duplicate entry gets created
1100
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1101

  
1102
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1103
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1104
                                                     user2,
1105
                                                     remove=False)
1106
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1107

  
1108
        local = auth_providers.get_provider('local', user2)
1109
        google = auth_providers.get_provider('google', user2)
1110
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1111
        self.assertTrue(shibboleth.get_login_policy)
1112
        self.assertFalse(shibboleth.get_remove_policy)
1113
        self.assertFalse(local.get_add_policy)
1114
        self.assertFalse(local.get_add_policy)
1115
        self.assertFalse(google.get_add_policy)
1116

  
1117
        user2.groups.remove(Group.objects.get(name='academic-user'))
1118
        self.assertTrue(local.get_add_policy)
1119
        self.assertTrue(google.get_add_policy)
1120
        user2.groups.add(Group.objects.get(name='academic-user'))
1121

  
1122
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1123
                                                     user2,
1124
                                                     exclusive=True,
1125
                                                     add=True)
1126
        self.assertTrue(local.get_add_policy)
1127
        self.assertTrue(google.get_add_policy)
1128

  
1129
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1130
        self.assertFalse(local.get_automoderate_policy)
1131
        self.assertFalse(google.get_automoderate_policy)
1132
        self.assertTrue(shibboleth.get_automoderate_policy)
1133

  
1134
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1135
                  'GOOGLE_ADD_GROUPS_POLICY']:
1136
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1137

  
1138

  
1139
    @shibboleth_settings(CREATE_POLICY=True)
1140
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1141
    def test_create_http(self):
1142
        # this should be wrapped inside a transaction
1143
        user = AstakosUser(email="test@test.com")
1144
        user.save()
1145
        provider = auth_providers.get_provider('shibboleth', user,
1146
                                               'test@academia.test')
1147
        provider.add_to_user()
1148
        user.get_auth_provider('shibboleth', 'test@academia.test')
1149
        provider = auth_providers.get_provider('local', user)
1150
        provider.add_to_user()
1151
        user.get_auth_provider('local')
1152

  
1153
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1154
        user = AstakosUser(email="test2@test.com")
1155
        user.save()
1156
        provider = auth_providers.get_provider('shibboleth', user,
1157
                                               'test@shibboleth.com',
1158
                                               **{'info': {'name':
1159
                                                                'User Test'}})
1160
        self.assertFalse(provider.get_create_policy)
1161
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1162
        self.assertTrue(provider.get_create_policy)
1163
        academic = provider.add_to_user()
1164

  
1165
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1166
    @shibboleth_settings(LIMIT_POLICY=2)
1167
    def test_policies(self):
1168
        user = get_local_user('kpap@synnefo.org')
1169
        user.add_auth_provider('shibboleth', identifier='1234')
1170
        user.add_auth_provider('shibboleth', identifier='12345')
1171

  
1172
        # default limit is 1
1173
        local = user.get_auth_provider('local')
1174
        self.assertEqual(local.get_add_policy, False)
1175

  
1176
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1177
        academic = user.get_auth_provider('shibboleth',
1178
                                          identifier='1234')
1179
        self.assertEqual(academic.get_add_policy, False)
1180
        newacademic = auth_providers.get_provider('shibboleth', user,
1181
                                                  identifier='123456')
1182
        self.assertEqual(newacademic.get_add_policy, True)
1183
        user.add_auth_provider('shibboleth', identifier='123456')
1184
        self.assertEqual(academic.get_add_policy, False)
1185
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1186

  
1187
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1188
    @shibboleth_settings(LIMIT_POLICY=2)
1189
    def test_messages(self):
1190
        user = get_local_user('kpap@synnefo.org')
1191
        user.add_auth_provider('shibboleth', identifier='1234')
1192
        user.add_auth_provider('shibboleth', identifier='12345')
1193
        provider = auth_providers.get_provider('shibboleth')
1194
        self.assertEqual(provider.get_message('title'), 'Academic')
1195
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1196
        # regenerate messages cache
1197
        provider = auth_providers.get_provider('shibboleth')
1198
        self.assertEqual(provider.get_message('title'), 'New title')
1199
        self.assertEqual(provider.get_message('login_title'),
1200
                         'New title LOGIN')
1201
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1202
        self.assertEqual(provider.get_module_icon,
1203
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1204
        self.assertEqual(provider.get_module_medium_icon,
1205
                         settings.MEDIA_URL +
1206
                         'im/auth/icons-medium/shibboleth.png')
1207

  
1208
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1209
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1210
        self.assertEqual(provider.get_method_details_msg,
1211
                         'Account: 12345')
1212
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1213
        self.assertEqual(provider.get_method_details_msg,
1214
                         'Account: 1234')
1215

  
1216
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1217
        self.assertEqual(provider.get_not_active_msg,
1218
                         "'Academic login' is disabled.")
1219

  
1220
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1221
    @shibboleth_settings(LIMIT_POLICY=2)
1222
    def test_templates(self):
1223
        user = get_local_user('kpap@synnefo.org')
1224
        user.add_auth_provider('shibboleth', identifier='1234')
1225
        user.add_auth_provider('shibboleth', identifier='12345')
1226

  
1227
        provider = auth_providers.get_provider('shibboleth')
1228
        self.assertEqual(provider.get_template('login'),
1229
                         'im/auth/shibboleth_login.html')
1230
        provider = auth_providers.get_provider('google')
1231
        self.assertEqual(provider.get_template('login'),
1232
                         'im/auth/generic_login.html')
1233

  
1234

  
1235
class TestActivationBackend(TestCase):
1236

  
1237
    def setUp(self):
1238
        # dummy call to pass through logging middleware
1239
        self.client.get('/im/')
1240

  
1241
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1242
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1243
    def test_policies(self):
1244
        backend = activation_backends.get_backend()
1245

  
1246
        # email matches RE_USER_EMAIL_PATTERNS
1247
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1248
                               is_active=False, email_verified=False)
1249
        backend.handle_verification(user1, user1.verification_code)
1250
        self.assertEqual(user1.accepted_policy, 'email')
1251

  
1252
        # manually moderated
1253
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1254
                               is_active=False, email_verified=False)
1255

  
1256
        backend.handle_verification(user2, user2.verification_code)
1257
        self.assertEqual(user2.moderated, False)
1258
        backend.handle_moderation(user2)
1259
        self.assertEqual(user2.moderated, True)
1260
        self.assertEqual(user2.accepted_policy, 'manual')
1261

  
1262
        # autoaccept due to provider automoderate policy
1263
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1264
                               is_active=False, email_verified=False)
1265
        user3.auth_providers.all().delete()
1266
        user3.add_auth_provider('shibboleth', identifier='shib123')
1267
        backend.handle_verification(user3, user3.verification_code)
1268
        self.assertEqual(user3.moderated, True)
1269
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1270

  
1271
    @im_settings(MODERATION_ENABLED=False,
1272
                 MANAGERS=(('Manager',
1273
                            'manager@synnefo.org'),),
1274
                 HELPDESK=(('Helpdesk',
1275
                            'helpdesk@synnefo.org'),),
1276
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1277
    def test_without_moderation(self):
1278
        backend = activation_backends.get_backend()
1279
        form = backend.get_signup_form('local')
1280
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1281

  
1282
        user_data = {
1283
            'email': 'kpap@synnefo.org',
1284
            'first_name': 'Kostas Papas',
1285
            'password1': '123',
1286
            'password2': '123'
1287
        }
1288
        form = backend.get_signup_form('local', user_data)
1289
        user = form.save(commit=False)
1290
        form.store_user(user)
1291
        self.assertEqual(user.is_active, False)
1292
        self.assertEqual(user.email_verified, False)
1293

  
1294
        # step one, registration
1295
        result = backend.handle_registration(user)
1296
        user = AstakosUser.objects.get()
1297
        self.assertEqual(user.is_active, False)
1298
        self.assertEqual(user.email_verified, False)
1299
        self.assertTrue(user.verification_code)
1300
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1301
        backend.send_result_notifications(result, user)
1302
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1303
        self.assertEqual(len(mail.outbox), 1)
1304

  
1305
        # step two, verify email (automatically
1306
        # moderates/accepts user, since moderation is disabled)
1307
        user = AstakosUser.objects.get()
1308
        valid_code = user.verification_code
1309

  
1310
        # test invalid code
1311
        result = backend.handle_verification(user, valid_code)
1312
        backend.send_result_notifications(result, user)
1313
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1314
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1315
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1316
        # verification + activated + greeting = 3
1317
        self.assertEqual(len(mail.outbox), 3)
1318
        user = AstakosUser.objects.get()
1319
        self.assertEqual(user.is_active, True)
1320
        self.assertEqual(user.moderated, True)
1321
        self.assertTrue(user.moderated_at)
1322
        self.assertEqual(user.email_verified, True)
1323
        self.assertTrue(user.activation_sent)
1324

  
1325
    @im_settings(MODERATION_ENABLED=True,
1326
                 MANAGERS=(('Manager',
1327
                            'manager@synnefo.org'),),
1328
                 HELPDESK=(('Helpdesk',
1329
                            'helpdesk@synnefo.org'),),
1330
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1331
    def test_with_moderation(self):
1332

  
1333
        backend = activation_backends.get_backend()
1334
        form = backend.get_signup_form('local')
1335
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1336

  
1337
        user_data = {
1338
            'email': 'kpap@synnefo.org',
1339
            'first_name': 'Kostas Papas',
1340
            'password1': '123',
1341
            'password2': '123'
1342
        }
1343
        form = backend.get_signup_form(provider='local',
1344
                                       initial_data=user_data)
1345
        user = form.save(commit=False)
1346
        form.store_user(user)
1347
        self.assertEqual(user.is_active, False)
1348
        self.assertEqual(user.email_verified, False)
1349

  
1350
        # step one, registration
1351
        result = backend.handle_registration(user)
1352
        user = AstakosUser.objects.get()
1353
        self.assertEqual(user.is_active, False)
1354
        self.assertEqual(user.email_verified, False)
1355
        self.assertTrue(user.verification_code)
1356
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1357
        backend.send_result_notifications(result, user)
1358
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1359
        self.assertEqual(len(mail.outbox), 1)
1360

  
1361
        # step two, verifying email
1362
        user = AstakosUser.objects.get()
1363
        valid_code = user.verification_code
1364
        invalid_code = user.verification_code + 'invalid'
1365

  
1366
        # test invalid code
1367
        result = backend.handle_verification(user, invalid_code)
1368
        self.assertEqual(result.status, backend.Result.ERROR)
1369
        backend.send_result_notifications(result, user)
1370
        user = AstakosUser.objects.get()
1371
        self.assertEqual(user.is_active, False)
1372
        self.assertEqual(user.moderated, False)
1373
        self.assertEqual(user.moderated_at, None)
1374
        self.assertEqual(user.email_verified, False)
1375
        self.assertTrue(user.activation_sent)
1376

  
1377
        # test valid code
1378
        user = AstakosUser.objects.get()
1379
        result = backend.handle_verification(user, valid_code)
1380
        backend.send_result_notifications(result, user)
1381
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1382
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1383
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1384
        self.assertEqual(len(mail.outbox), 2)
1385
        user = AstakosUser.objects.get()
1386
        self.assertEqual(user.moderated, False)
1387
        self.assertEqual(user.moderated_at, None)
1388
        self.assertEqual(user.email_verified, True)
1389
        self.assertTrue(user.activation_sent)
1390

  
1391
        # test code reuse
1392
        result = backend.handle_verification(user, valid_code)
1393
        self.assertEqual(result.status, backend.Result.ERROR)
1394
        user = AstakosUser.objects.get()
1395
        self.assertEqual(user.is_active, False)
1396
        self.assertEqual(user.moderated, False)
1397
        self.assertEqual(user.moderated_at, None)
1398
        self.assertEqual(user.email_verified, True)
1399
        self.assertTrue(user.activation_sent)
1400

  
1401
        # valid code on verified user
1402
        user = AstakosUser.objects.get()
1403
        valid_code = user.verification_code
1404
        result = backend.handle_verification(user, valid_code)
1405
        self.assertEqual(result.status, backend.Result.ERROR)
1406

  
1407
        # step three, moderation user
1408
        user = AstakosUser.objects.get()
1409
        result = backend.handle_moderation(user)
1410
        backend.send_result_notifications(result, user)
1411

  
1412
        user = AstakosUser.objects.get()
1413
        self.assertEqual(user.is_active, True)
1414
        self.assertEqual(user.moderated, True)
1415
        self.assertTrue(user.moderated_at)
1416
        self.assertEqual(user.email_verified, True)
1417
        self.assertTrue(user.activation_sent)
1418

  
1419

  
1420
class TestProjects(TestCase):
1421
    """
1422
    Test projects.
1423
    """
1424
    def setUp(self):
1425
        # astakos resources
1426
        self.astakos_service = Service.objects.create(name="astakos",
1427
                                                      api_url="/astakos/api/")
1428
        self.resource = Resource.objects.create(name="astakos.pending_app",
1429
                                                uplimit=0,
1430
                                                allow_in_projects=False,
1431
                                                service=self.astakos_service)
1432

  
1433
        # custom service resources
1434
        self.service = Service.objects.create(name="service1",
1435
                                              api_url="http://service.api")
1436
        self.resource = Resource.objects.create(name="service1.resource",
1437
                                                uplimit=100,
1438
                                                service=self.service)
1439
        self.admin = get_local_user("projects-admin@synnefo.org")
1440
        self.admin.uuid = 'uuid1'
1441
        self.admin.save()
1442

  
1443
        self.user = get_local_user("user@synnefo.org")
1444
        self.member = get_local_user("member@synnefo.org")
1445
        self.member2 = get_local_user("member2@synnefo.org")
1446

  
1447
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1448
        self.user_client = get_user_client("user@synnefo.org")
1449
        self.member_client = get_user_client("member@synnefo.org")
1450
        self.member2_client = get_user_client("member2@synnefo.org")
1451

  
1452
        quotas.qh_sync_users(AstakosUser.objects.all())
1453

  
1454
    @im_settings(PROJECT_ADMINS=['uuid1'])
1455
    def test_application_limit(self):
1456
        # user cannot create a project
1457
        r = self.user_client.get(reverse('project_add'), follow=True)
1458
        self.assertRedirects(r, reverse('project_list'))
1459
        self.assertContains(r, "You are not allowed to create a new project")
1460

  
1461
        # but admin can
1462
        r = self.admin_client.get(reverse('project_add'), follow=True)
1463
        self.assertRedirects(r, reverse('project_add'))
1464

  
1465
    @im_settings(PROJECT_ADMINS=['uuid1'])
1466
    def test_allow_in_project(self):
1467
        dfrom = datetime.now()
1468
        dto = datetime.now() + timedelta(days=30)
1469

  
1470
        # astakos.pending_uplimit allow_in_project flag is False
1471
        # we shouldn't be able to create a project application using this
1472
        # resource.
1473
        application_data = {
1474
            'name': 'project.synnefo.org',
1475
            'homepage': 'https://www.synnefo.org',
1476
            'start_date': dfrom.strftime("%Y-%m-%d"),
1477
            'end_date': dto.strftime("%Y-%m-%d"),
1478
            'member_join_policy': 2,
1479
            'member_leave_policy': 1,
1480
            'service1.resource_uplimit': 100,
1481
            'is_selected_service1.resource': "1",
1482
            'astakos.pending_app_uplimit': 100,
1483
            'is_selected_accounts': "1",
1484
            'user': self.user.pk
1485
        }
1486
        form = forms.ProjectApplicationForm(data=application_data)
1487
        # form is invalid
1488
        self.assertEqual(form.is_valid(), False)
1489

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff