Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ 95150b7d

History | View | Annotate | Download (59.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings
40

    
41
from django.test import TestCase, Client
42
from django.core import mail
43
from django.http import SimpleCookie, HttpRequest, QueryDict
44
from django.utils.importlib import import_module
45

    
46
from astakos.im.activation_backends import *
47
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
48
from astakos.im.models import *
49
from astakos.im import functions
50
from astakos.im import settings as astakos_settings
51
from astakos.im import forms
52

    
53
from urllib import quote
54
from datetime import timedelta
55

    
56
from astakos.im import messages
57
from astakos.im import auth_providers
58
from astakos.im import quotas
59

    
60
from django.conf import settings
61

    
62

    
63
# set some common settings
64
astakos_settings.EMAILCHANGE_ENABLED = True
65
astakos_settings.RECAPTCHA_ENABLED = False
66

    
67
settings.LOGGING_SETUP['disable_existing_loggers'] = False
68

    
69
# shortcut decorators to override provider settings
70
# e.g. shibboleth_settings(ENABLED=True) will set
71
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
72
prefixes = {'providers': 'AUTH_PROVIDER_',
73
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
74
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
75
im_settings = functools.partial(with_settings, astakos_settings)
76
shibboleth_settings = functools.partial(with_settings,
77
                                        settings,
78
                                        prefix=prefixes['shibboleth'])
79
localauth_settings = functools.partial(with_settings, settings,
80
                                       prefix=prefixes['local'])
81

    
82

    
83
class AstakosTestClient(Client):
84
    pass
85

    
86

    
87
class ShibbolethClient(AstakosTestClient):
88
    """
89
    A shibboleth agnostic client.
90
    """
91
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
92
                          dir(ShibbolethTokens))
93

    
94
    def __init__(self, *args, **kwargs):
95
        self.tokens = kwargs.pop('tokens', {})
96
        super(ShibbolethClient, self).__init__(*args, **kwargs)
97

    
98
    def set_tokens(self, **kwargs):
99
        for key, value in kwargs.iteritems():
100
            key = 'SHIB_%s' % key.upper()
101
            if not key in self.VALID_TOKENS:
102
                raise Exception('Invalid shibboleth token')
103

    
104
            self.tokens[key] = value
105

    
106
    def unset_tokens(self, *keys):
107
        for key in keys:
108
            key = 'SHIB_%s' % param.upper()
109
            if key in self.tokens:
110
                del self.tokens[key]
111

    
112
    def reset_tokens(self):
113
        self.tokens = {}
114

    
115
    def get_http_token(self, key):
116
        http_header = getattr(ShibbolethTokens, key)
117
        return http_header
118

    
119
    def request(self, **request):
120
        """
121
        Transform valid shibboleth tokens to http headers
122
        """
123
        for token, value in self.tokens.iteritems():
124
            request[self.get_http_token(token)] = value
125

    
126
        for param in request.keys():
127
            key = 'SHIB_%s' % param.upper()
128
            if key in self.VALID_TOKENS:
129
                request[self.get_http_token(key)] = request[param]
130
                del request[param]
131

    
132
        return super(ShibbolethClient, self).request(**request)
133

    
134

    
135
def get_user_client(username, password="password"):
136
    client = Client()
137
    client.login(username=username, password=password)
138
    return client
139

    
140

    
141
def get_local_user(username, **kwargs):
142
        try:
143
            return AstakosUser.objects.get(email=username)
144
        except:
145
            user_params = {
146
                'username': username,
147
                'email': username,
148
                'is_active': True,
149
                'activation_sent': datetime.now(),
150
                'email_verified': True,
151
                'provider': 'local'
152
            }
153
            user_params.update(kwargs)
154
            user = AstakosUser(**user_params)
155
            user.set_password(kwargs.get('password', 'password'))
156
            user.save()
157
            user.add_auth_provider('local', auth_backend='astakos')
158
            if kwargs.get('is_active', True):
159
                user.is_active = True
160
            else:
161
                user.is_active = False
162
            user.save()
163
            return user
164

    
165

    
166
def get_mailbox(email):
167
    mails = []
168
    for sent_email in mail.outbox:
169
        for recipient in sent_email.recipients():
170
            if email in recipient:
171
                mails.append(sent_email)
172
    return mails
173

    
174

    
175
class ShibbolethTests(TestCase):
176
    """
177
    Testing shibboleth authentication.
178
    """
179

    
180
    fixtures = ['groups']
181

    
182
    def setUp(self):
183
        self.client = ShibbolethClient()
184
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
185
        astakos_settings.MODERATION_ENABLED = True
186

    
187
    @im_settings(FORCE_PROFILE_UPDATE=False)
188
    def test_create_account(self):
189

    
190
        client = ShibbolethClient()
191

    
192
        # shibboleth views validation
193
        # eepn required
194
        r = client.get('/im/login/shibboleth?', follow=True)
195
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
196
            'domain': astakos_settings.BASEURL,
197
            'contact_email': settings.CONTACT_EMAIL
198
        })
199
        client.set_tokens(eppn="kpapeppn")
200

    
201
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
202
        # shibboleth user info required
203
        r = client.get('/im/login/shibboleth?', follow=True)
204
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
205
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
206

    
207
        # shibboleth logged us in
208
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
209
                          cn="Kostas Papadimitriou",
210
                          ep_affiliation="Test Affiliation")
211
        r = client.get('/im/login/shibboleth?', follow=True)
212
        token = PendingThirdPartyUser.objects.get().token
213
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
214
        self.assertEqual(r.status_code, 200)
215

    
216
        # a new pending user created
217
        pending_user = PendingThirdPartyUser.objects.get(
218
            third_party_identifier="kpapeppn")
219
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
220
        # keep the token for future use
221
        token = pending_user.token
222
        # from now on no shibboleth headers are sent to the server
223
        client.reset_tokens()
224

    
225
        # this is the old way, it should fail, to avoid pending user take over
226
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
227
        self.assertEqual(r.status_code, 404)
228

    
229
        # this is the signup unique url associated with the pending user
230
        # created
231
        r = client.get('/im/signup/?third_party_token=%s' % token)
232
        identifier = pending_user.third_party_identifier
233
        post_data = {'third_party_identifier': identifier,
234
                     'first_name': 'Kostas',
235
                     'third_party_token': token,
236
                     'last_name': 'Mitroglou',
237
                     'provider': 'shibboleth'}
238

    
239
        signup_url = reverse('signup')
240

    
241
        # invlid email
242
        post_data['email'] = 'kpap'
243
        r = client.post(signup_url, post_data)
244
        self.assertContains(r, token)
245

    
246
        # existing email
247
        existing_user = get_local_user('test@test.com')
248
        post_data['email'] = 'test@test.com'
249
        r = client.post(signup_url, post_data)
250
        self.assertContains(r, messages.EMAIL_USED)
251
        existing_user.delete()
252

    
253
        # and finally a valid signup
254
        post_data['email'] = 'kpap@grnet.gr'
255
        r = client.post(signup_url, post_data, follow=True)
256
        self.assertContains(r, messages.NOTIFICATION_SENT)
257

    
258
        # everything is ok in our db
259
        self.assertEqual(AstakosUser.objects.count(), 1)
260
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
261
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
262

    
263
        # provider info stored
264
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
265
        self.assertEqual(provider.affiliation, 'Test Affiliation')
266
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
267
                                         u'eppn': u'kpapeppn',
268
                                         u'name': u'Kostas Papadimitriou'})
269

    
270
        # lets login (not activated yet)
271
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
272
                          cn="Kostas Papadimitriou", )
273
        r = client.get("/im/login/shibboleth?", follow=True)
274
        self.assertContains(r, 'is pending moderation')
275

    
276
        # admin activates our user
277
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
278
        functions.activate(u)
279
        self.assertEqual(u.is_active, True)
280

    
281
        # we see our profile
282
        r = client.get("/im/login/shibboleth?", follow=True)
283
        self.assertRedirects(r, '/im/landing')
284
        self.assertEqual(r.status_code, 200)
285

    
286
    def test_existing(self):
287
        """
288
        Test adding of third party login to an existing account
289
        """
290

    
291
        # this is our existing user
292
        existing_user = get_local_user('kpap@grnet.gr')
293
        existing_inactive = get_local_user('kpap-inactive@grnet.gr')
294
        existing_inactive.is_active = False
295
        existing_inactive.save()
296

    
297
        existing_unverified = get_local_user('kpap-unverified@grnet.gr')
298
        existing_unverified.is_active = False
299
        existing_unverified.activation_sent = None
300
        existing_unverified.email_verified = False
301
        existing_unverified.is_verified = False
302
        existing_unverified.save()
303

    
304
        client = ShibbolethClient()
305
        # shibboleth logged us in, notice that we use different email
306
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
307
                          cn="Kostas Papadimitriou", )
308
        r = client.get("/im/login/shibboleth?", follow=True)
309

    
310
        # a new pending user created
311
        pending_user = PendingThirdPartyUser.objects.get()
312
        token = pending_user.token
313
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
314
        pending_key = pending_user.token
315
        client.reset_tokens()
316
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
317

    
318
        form = r.context['login_form']
319
        signupdata = copy.copy(form.initial)
320
        signupdata['email'] = 'kpap@grnet.gr'
321
        signupdata['third_party_token'] = token
322
        signupdata['provider'] = 'shibboleth'
323
        signupdata.pop('id', None)
324

    
325
        # the email exists to another user
326
        r = client.post("/im/signup", signupdata)
327
        self.assertContains(r, "There is already an account with this email "
328
                               "address")
329
        # change the case, still cannot create
330
        signupdata['email'] = 'KPAP@grnet.GR'
331
        r = client.post("/im/signup", signupdata)
332
        self.assertContains(r, "There is already an account with this email "
333
                               "address")
334
        # inactive user
335
        signupdata['email'] = 'KPAP-inactive@grnet.GR'
336
        r = client.post("/im/signup", signupdata)
337
        self.assertContains(r, "There is already an account with this email "
338
                               "address")
339

    
340
        # unverified user, this should pass, old entry will be deleted
341
        signupdata['email'] = 'KAPAP-unverified@grnet.GR'
342
        r = client.post("/im/signup", signupdata)
343

    
344
        post_data = {'password': 'password',
345
                     'username': 'kpap@grnet.gr'}
346
        r = client.post('/im/local', post_data, follow=True)
347
        self.assertTrue(r.context['request'].user.is_authenticated())
348
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
349
                          cn="Kostas Papadimitriou", )
350
        r = client.get("/im/login/shibboleth?", follow=True)
351
        self.assertContains(r, "enabled for this account")
352
        client.reset_tokens()
353

    
354
        user = existing_user
355
        self.assertTrue(user.has_auth_provider('shibboleth'))
356
        self.assertTrue(user.has_auth_provider('local',
357
                                               auth_backend='astakos'))
358
        client.logout()
359

    
360
        # look Ma, i can login with both my shibboleth and local account
361
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
362
                          cn="Kostas Papadimitriou")
363
        r = client.get("/im/login/shibboleth?", follow=True)
364
        self.assertTrue(r.context['request'].user.is_authenticated())
365
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
366
        self.assertRedirects(r, '/im/landing')
367
        self.assertEqual(r.status_code, 200)
368
        client.logout()
369
        client.reset_tokens()
370

    
371
        # logged out
372
        r = client.get("/im/profile", follow=True)
373
        self.assertFalse(r.context['request'].user.is_authenticated())
374

    
375
        # login with local account also works
376
        post_data = {'password': 'password',
377
                     'username': 'kpap@grnet.gr'}
378
        r = self.client.post('/im/local', post_data, follow=True)
379
        self.assertTrue(r.context['request'].user.is_authenticated())
380
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
381
        self.assertRedirects(r, '/im/landing')
382
        self.assertEqual(r.status_code, 200)
383

    
384
        # cannot add the same eppn
385
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
386
                          cn="Kostas Papadimitriou", )
387
        r = client.get("/im/login/shibboleth?", follow=True)
388
        self.assertRedirects(r, '/im/landing')
389
        self.assertTrue(r.status_code, 200)
390
        self.assertEquals(existing_user.auth_providers.count(), 2)
391

    
392
        # only one allowed by default
393
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
394
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
395
        prov = auth_providers.get_provider('shibboleth')
396
        r = client.get("/im/login/shibboleth?", follow=True)
397
        self.assertContains(r, "Failed to add")
398
        self.assertRedirects(r, '/im/profile')
399
        self.assertTrue(r.status_code, 200)
400
        self.assertEquals(existing_user.auth_providers.count(), 2)
401
        client.logout()
402
        client.reset_tokens()
403

    
404
        # cannot login with another eppn
405
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
406
                          cn="Kostas Papadimitriou")
407
        r = client.get("/im/login/shibboleth?", follow=True)
408
        self.assertFalse(r.context['request'].user.is_authenticated())
409

    
410
        # cannot
411

    
412
        # lets remove local password
413
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
414
                                       email="kpap@grnet.gr")
415
        remove_local_url = user.get_auth_provider('local').get_remove_url
416
        remove_shibbo_url = user.get_auth_provider('shibboleth',
417
                                                   'kpapeppn').get_remove_url
418
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
419
                          cn="Kostas Papadimtriou")
420
        r = client.get("/im/login/shibboleth?", follow=True)
421
        client.reset_tokens()
422

    
423
        # TODO: this view should use POST
424
        r = client.get(remove_local_url)
425
        # 2 providers left
426
        self.assertEqual(user.auth_providers.count(), 1)
427
        # cannot remove last provider
428
        r = client.get(remove_shibbo_url)
429
        self.assertEqual(r.status_code, 403)
430
        self.client.logout()
431

    
432
        # cannot login using local credentials (notice we use another client)
433
        post_data = {'password': 'password',
434
                     'username': 'kpap@grnet.gr'}
435
        r = self.client.post('/im/local', post_data, follow=True)
436
        self.assertFalse(r.context['request'].user.is_authenticated())
437

    
438
        # we can reenable the local provider by setting a password
439
        r = client.get("/im/password_change", follow=True)
440
        r = client.post("/im/password_change", {'new_password1': '111',
441
                                                'new_password2': '111'},
442
                        follow=True)
443
        user = r.context['request'].user
444
        self.assertTrue(user.has_auth_provider('local'))
445
        self.assertTrue(user.has_auth_provider('shibboleth'))
446
        self.assertTrue(user.check_password('111'))
447
        self.assertTrue(user.has_usable_password())
448
        self.client.logout()
449

    
450
        # now we can login
451
        post_data = {'password': '111',
452
                     'username': 'kpap@grnet.gr'}
453
        r = self.client.post('/im/local', post_data, follow=True)
454
        self.assertTrue(r.context['request'].user.is_authenticated())
455

    
456
        client.reset_tokens()
457

    
458
        # we cannot take over another shibboleth identifier
459
        user2 = get_local_user('another@grnet.gr')
460
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
461
        # login
462
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
463
                          cn="Kostas Papadimitriou")
464
        r = client.get("/im/login/shibboleth?", follow=True)
465
        # try to assign existing shibboleth identifier of another user
466
        client.set_tokens(mail="kpap_second@shibboleth.gr",
467
                          eppn="existingeppn", cn="Kostas Papadimitriou")
468
        r = client.get("/im/login/shibboleth?", follow=True)
469
        self.assertContains(r, "this account is already assigned")
470

    
471

    
472
class TestLocal(TestCase):
473

    
474
    fixtures = ['groups']
475

    
476
    def setUp(self):
477
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
478
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
479
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
480
        settings.ASTAKOS_MODERATION_ENABLED = True
481

    
482
    def tearDown(self):
483
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
484

    
485
    def test_no_moderation(self):
486
        # disable moderation
487
        astakos_settings.MODERATION_ENABLED = False
488

    
489
        # create a new user
490
        r = self.client.get("/im/signup")
491
        self.assertEqual(r.status_code, 200)
492
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
493
                'password2': 'password', 'first_name': 'Kostas',
494
                'last_name': 'Mitroglou', 'provider': 'local'}
495
        r = self.client.post("/im/signup", data)
496

    
497
        # user created
498
        self.assertEqual(AstakosUser.objects.count(), 1)
499
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
500
                                       email="kpap@grnet.gr")
501
        self.assertEqual(user.username, 'kpap@grnet.gr')
502
        self.assertEqual(user.has_auth_provider('local'), True)
503
        self.assertFalse(user.is_active)
504

    
505
        # user (but not admin) gets notified
506
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
507
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
508
        astakos_settings.MODERATION_ENABLED = True
509

    
510
    def test_email_case(self):
511
        data = {
512
            'email': 'kPap@grnet.gr',
513
            'password1': '1234',
514
            'password2': '1234'
515
        }
516

    
517
        form = forms.LocalUserCreationForm(data)
518
        self.assertTrue(form.is_valid())
519
        user = form.save()
520
        form.store_user(user, {})
521

    
522
        u = AstakosUser.objects.get()
523
        self.assertEqual(u.email, 'kPap@grnet.gr')
524
        self.assertEqual(u.username, 'kpap@grnet.gr')
525
        u.is_active = True
526
        u.email_verified = True
527
        u.save()
528

    
529
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
530
        login = forms.LoginForm(data=data)
531
        self.assertTrue(login.is_valid())
532

    
533
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
534
        login = forms.LoginForm(data=data)
535
        self.assertTrue(login.is_valid())
536

    
537
        data = {
538
            'email': 'kpap@grnet.gr',
539
            'password1': '1234',
540
            'password2': '1234'
541
        }
542
        form = forms.LocalUserCreationForm(data)
543
        self.assertFalse(form.is_valid())
544

    
545
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),))
546
    @im_settings(FORCE_PROFILE_UPDATE=False)
547
    def test_local_provider(self):
548
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
549
        # enable moderation
550
        astakos_settings.MODERATION_ENABLED = True
551

    
552
        # create a user
553
        r = self.client.get("/im/signup")
554
        self.assertEqual(r.status_code, 200)
555
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
556
                'password2': 'password', 'first_name': 'Kostas',
557
                'last_name': 'Mitroglou', 'provider': 'local'}
558
        r = self.client.post("/im/signup", data)
559

    
560
        # user created
561
        self.assertEqual(AstakosUser.objects.count(), 1)
562
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
563
                                       email="kpap@grnet.gr")
564
        self.assertEqual(user.username, 'kpap@grnet.gr')
565
        self.assertEqual(user.has_auth_provider('local'), True)
566
        self.assertFalse(user.is_active)  # not activated
567
        self.assertFalse(user.email_verified)  # not verified
568
        self.assertFalse(user.activation_sent)  # activation automatically sent
569

    
570
        # admin gets notified and activates the user from the command line
571
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
572
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
573
                                           'password': 'password'})
574
        self.assertContains(r, messages.NOTIFICATION_SENT)
575
        functions.send_activation(user)
576

    
577
        # user activation fields updated and user gets notified via email
578
        user = AstakosUser.objects.get(pk=user.pk)
579
        self.assertTrue(user.activation_sent)
580
        self.assertFalse(user.email_verified)
581
        self.assertFalse(user.is_active)
582
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
583

    
584
        # user forgot she got registered and tries to submit registration
585
        # form. Notice the upper case in email
586
        data = {'email': 'KPAP@grnet.gr', 'password1': 'password',
587
                'password2': 'password', 'first_name': 'Kostas',
588
                'last_name': 'Mitroglou', 'provider': 'local'}
589
        r = self.client.post("/im/signup", data, follow=True)
590
        self.assertRedirects(r, reverse('index'))
591
        self.assertContains(r, messages.NOTIFICATION_SENT)
592

    
593
        user = AstakosUser.objects.get()
594
        functions.send_activation(user)
595

    
596
        # previous user replaced
597
        self.assertTrue(user.activation_sent)
598
        self.assertFalse(user.email_verified)
599
        self.assertFalse(user.is_active)
600
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 1)
601

    
602
        # hmmm, email exists; lets request a password change
603
        r = self.client.get('/im/local/password_reset')
604
        self.assertEqual(r.status_code, 200)
605
        data = {'email': 'kpap@grnet.gr'}
606
        r = self.client.post('/im/local/password_reset', data, follow=True)
607
        # she can't because account is not active yet
608
        self.assertContains(r, 'pending activation')
609

    
610
        # moderation is enabled and an activation email has already been sent
611
        # so user can trigger resend of the activation email
612
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
613
        self.assertContains(r, 'has been sent to your email address.')
614
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 2)
615

    
616
        # also she cannot login
617
        data = {'username': 'kpap@grnet.gr', 'password': 'password'}
618
        r = self.client.post('/im/local', data, follow=True)
619
        self.assertContains(r, 'Resend activation')
620
        self.assertFalse(r.context['request'].user.is_authenticated())
621
        self.assertFalse('_pithos2_a' in self.client.cookies)
622

    
623
        # user sees the message and resends activation
624
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
625
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 3)
626

    
627
        # switch back moderation setting
628
        astakos_settings.MODERATION_ENABLED = True
629
        r = self.client.get(user.get_activation_url(), follow=True)
630
        self.assertRedirects(r, "/im/landing")
631
        r = self.client.get('/im/profile', follow=True)
632
        self.assertTrue(r.context['request'].user.is_authenticated())
633
        self.assertTrue('_pithos2_a' in self.client.cookies)
634
        self.assertContains(r, "KPAP@grnet.gr")
635
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 4)
636

    
637
        user = AstakosUser.objects.get(pk=user.pk)
638
        # user activated and logged in, token cookie set
639
        self.assertTrue(r.context['request'].user.is_authenticated())
640
        self.assertTrue('_pithos2_a' in self.client.cookies)
641
        cookies = self.client.cookies
642
        self.assertTrue(quote(user.auth_token) in
643
                        cookies.get('_pithos2_a').value)
644
        r = self.client.get('/im/logout', follow=True)
645
        r = self.client.get('/im/')
646
        # user logged out, token cookie removed
647
        self.assertFalse(r.context['request'].user.is_authenticated())
648
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
649

    
650
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
651
        del self.client.cookies['_pithos2_a']
652

    
653
        # user can login
654
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
655
                                           'password': 'password'},
656
                             follow=True)
657
        self.assertTrue(r.context['request'].user.is_authenticated())
658
        self.assertTrue('_pithos2_a' in self.client.cookies)
659
        cookies = self.client.cookies
660
        self.assertTrue(quote(user.auth_token) in
661
                        cookies.get('_pithos2_a').value)
662
        self.client.get('/im/logout', follow=True)
663

    
664
        # user forgot password
665
        old_pass = user.password
666
        r = self.client.get('/im/local/password_reset')
667
        self.assertEqual(r.status_code, 200)
668
        r = self.client.post('/im/local/password_reset', {'email':
669
                                                          'kpap@grnet.gr'})
670
        self.assertEqual(r.status_code, 302)
671
        # email sent
672
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 5)
673

    
674
        # user visits change password link
675
        # "Refresh" user because created url is based on last_login timestamp
676
        user = AstakosUser.objects.ger(pk=user.pk)
677
        r = self.client.get(user.get_password_reset_url())
678
        r = self.client.post(user.get_password_reset_url(),
679
                             {'new_password1': 'newpass',
680
                              'new_password2': 'newpass'})
681

    
682
        user = AstakosUser.objects.get(pk=user.pk)
683
        self.assertNotEqual(old_pass, user.password)
684

    
685
        # old pass is not usable
686
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
687
                                           'password': 'password'})
688
        self.assertContains(r, 'Please enter a correct username and password')
689
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
690
                                           'password': 'newpass'},
691
                             follow=True)
692
        self.assertTrue(r.context['request'].user.is_authenticated())
693
        self.client.logout()
694

    
695
        # tests of special local backends
696
        user = AstakosUser.objects.get(pk=user.pk)
697
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
698
        user.save()
699

    
700
        # non astakos local backends do not support password reset
701
        r = self.client.get('/im/local/password_reset')
702
        self.assertEqual(r.status_code, 200)
703
        r = self.client.post('/im/local/password_reset', {'email':
704
                                                          'kpap@grnet.gr'})
705
        # she can't because account is not active yet
706
        self.assertContains(r, "Changing password is not")
707

    
708

    
709
class UserActionsTests(TestCase):
710

    
711
    def test_email_change(self):
712
        # to test existing email validation
713
        get_local_user('existing@grnet.gr')
714

    
715
        # local user
716
        user = get_local_user('kpap@grnet.gr')
717

    
718
        # login as kpap
719
        self.client.login(username='kpap@grnet.gr', password='password')
720
        r = self.client.get('/im/profile', follow=True)
721
        user = r.context['request'].user
722
        self.assertTrue(user.is_authenticated())
723

    
724
        # change email is enabled
725
        r = self.client.get('/im/email_change')
726
        self.assertEqual(r.status_code, 200)
727
        self.assertFalse(user.email_change_is_pending())
728

    
729
        # request email change to an existing email fails
730
        data = {'new_email_address': 'existing@grnet.gr'}
731
        r = self.client.post('/im/email_change', data)
732
        self.assertContains(r, messages.EMAIL_USED)
733

    
734
        # proper email change
735
        data = {'new_email_address': 'kpap@gmail.com'}
736
        r = self.client.post('/im/email_change', data, follow=True)
737
        self.assertRedirects(r, '/im/profile')
738
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
739
        change1 = EmailChange.objects.get()
740

    
741
        # user sees a warning
742
        r = self.client.get('/im/email_change')
743
        self.assertEqual(r.status_code, 200)
744
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
745
        self.assertTrue(user.email_change_is_pending())
746

    
747
        # link was sent
748
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
749
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
750

    
751
        # proper email change
752
        data = {'new_email_address': 'kpap@yahoo.com'}
753
        r = self.client.post('/im/email_change', data, follow=True)
754
        self.assertRedirects(r, '/im/profile')
755
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
756
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
757
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
758
        change2 = EmailChange.objects.get()
759

    
760
        r = self.client.get(change1.get_url())
761
        self.assertEquals(r.status_code, 302)
762
        self.client.logout()
763

    
764
        r = self.client.post('/im/local?next=' + change2.get_url(),
765
                             {'username': 'kpap@grnet.gr',
766
                              'password': 'password',
767
                              'next': change2.get_url()},
768
                             follow=True)
769
        self.assertRedirects(r, '/im/profile')
770
        user = r.context['request'].user
771
        self.assertEquals(user.email, 'kpap@yahoo.com')
772
        self.assertEquals(user.username, 'kpap@yahoo.com')
773

    
774
        self.client.logout()
775
        r = self.client.post('/im/local?next=' + change2.get_url(),
776
                             {'username': 'kpap@grnet.gr',
777
                              'password': 'password',
778
                              'next': change2.get_url()},
779
                             follow=True)
780
        self.assertContains(r, "Please enter a correct username and password")
781
        self.assertEqual(user.emailchanges.count(), 0)
782

    
783

    
784
class TestAuthProviderViews(TestCase):
785

    
786
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'])
787
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
788
    @im_settings(IM_MODULES=['shibboleth', 'local'])
789
    @im_settings(MODERATION_ENABLED=True)
790
    @im_settings(FORCE_PROFILE_UPDATE=False)
791
    def test_user(self):
792
        Profile = AuthProviderPolicyProfile
793
        Pending = PendingThirdPartyUser
794
        User = AstakosUser
795

    
796
        User.objects.create(email="newuser@grnet.gr")
797
        get_local_user("olduser@grnet.gr")
798
        cl_olduser = ShibbolethClient()
799
        get_local_user("olduser2@grnet.gr")
800
        ShibbolethClient()
801
        cl_newuser = ShibbolethClient()
802
        cl_newuser2 = Client()
803

    
804
        academic_group, created = Group.objects.get_or_create(
805
            name='academic-login')
806
        academic_users = academic_group.user_set
807
        assert created
808
        policy_only_academic = Profile.objects.add_policy('academic_strict',
809
                                                          'shibboleth',
810
                                                          academic_group,
811
                                                          exclusive=True,
812
                                                          login=False,
813
                                                          add=False)
814

    
815

    
816
        # new academic user
817
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
818
        cl_newuser.set_tokens(eppn="newusereppn")
819
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
820
        pending = Pending.objects.get()
821
        identifier = pending.third_party_identifier
822
        signup_data = {'third_party_identifier': identifier,
823
                       'first_name': 'Academic',
824
                       'third_party_token': pending.token,
825
                       'last_name': 'New User',
826
                       'provider': 'shibboleth'}
827
        r = cl_newuser.post('/im/signup', signup_data)
828
        self.assertContains(r, "This field is required", )
829
        signup_data['email'] = 'olduser@grnet.gr'
830
        r = cl_newuser.post('/im/signup', signup_data)
831
        self.assertContains(r, "already an account with this email", )
832
        signup_data['email'] = 'newuser@grnet.gr'
833
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
834
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
835
        self.assertEqual(r.status_code, 404)
836
        newuser = User.objects.get(email="newuser@grnet.gr")
837
        activation_link = newuser.get_activation_url()
838
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
839

    
840
        # new non-academic user
841
        signup_data = {'first_name': 'Non Academic',
842
                       'last_name': 'New User',
843
                       'provider': 'local',
844
                       'password1': 'password',
845
                       'password2': 'password'}
846
        signup_data['email'] = 'olduser@grnet.gr'
847
        r = cl_newuser2.post('/im/signup', signup_data)
848
        self.assertContains(r, 'There is already an account with this '
849
                               'email address')
850
        signup_data['email'] = 'newuser@grnet.gr'
851
        r = cl_newuser2.post('/im/signup/', signup_data)
852
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
853
        r = self.client.get(activation_link, follow=True)
854
        self.assertEqual(r.status_code, 400)
855
        newuser = User.objects.get(email="newuser@grnet.gr")
856
        self.assertFalse(newuser.activation_sent)
857
        r = self.client.get(newuser.get_activation_url(), follow=True)
858
        self.assertContains(r, "pending moderation")
859

    
860
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
861
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
862
        pending = Pending.objects.get()
863
        identifier = pending.third_party_identifier
864
        signup_data = {'third_party_identifier': identifier,
865
                       'first_name': 'Academic',
866
                       'third_party_token': pending.token,
867
                       'last_name': 'New User',
868
                       'provider': 'shibboleth'}
869
        signup_data['email'] = 'newuser@grnet.gr'
870
        r = cl_newuser.post('/im/signup', signup_data)
871
        newuser = User.objects.get(email="newuser@grnet.gr")
872
        self.assertTrue(newuser.activation_sent)
873
        activation_link = newuser.get_activation_url()
874
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
875
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
876
        self.assertRedirects(r, '/im/landing')
877
        newuser = User.objects.get(email="newuser@grnet.gr")
878
        self.assertEqual(newuser.is_active, True)
879
        self.assertEqual(newuser.email_verified, True)
880
        cl_newuser.logout()
881

    
882
        # cannot reactivate if suspended
883
        newuser.is_active = False
884
        newuser.save()
885
        r = cl_newuser.get(newuser.get_activation_url())
886
        newuser = User.objects.get(email="newuser@grnet.gr")
887
        self.assertFalse(newuser.is_active)
888

    
889
        # release suspension
890
        newuser.is_active = True
891
        newuser.save()
892

    
893
        cl_newuser.get('/im/login/shibboleth?', follow=True)
894
        local = auth.get_provider('local', newuser)
895
        self.assertEqual(local.get_add_policy, False)
896
        self.assertEqual(local.get_login_policy, False)
897
        r = cl_newuser.get(local.get_add_url, follow=True)
898
        self.assertRedirects(r, '/im/profile')
899
        self.assertContains(r, 'disabled for your')
900

    
901
        cl_olduser.login(username='olduser@grnet.gr', password="password")
902
        r = cl_olduser.get('/im/profile', follow=True)
903
        self.assertEqual(r.status_code, 200)
904
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
905
        self.assertContains(r, 'Your request is missing a unique token')
906
        cl_olduser.set_tokens(eppn="newusereppn")
907
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
908
        self.assertContains(r, 'is already assigned to another user')
909
        cl_olduser.set_tokens(eppn="oldusereppn")
910
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
911
        self.assertContains(r, 'Academic login enabled for this account')
912

    
913
        user = User.objects.get(email="olduser@grnet.gr")
914
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
915
        local_provider = user.get_auth_provider('local')
916
        self.assertEqual(shib_provider.get_remove_policy, True)
917
        self.assertEqual(local_provider.get_remove_policy, True)
918

    
919

    
920
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
921
                                                          'shibboleth',
922
                                                          academic_group,
923
                                                          remove=False)
924
        user.groups.add(academic_group)
925
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
926
        local_provider = user.get_auth_provider('local')
927
        self.assertEqual(shib_provider.get_remove_policy, False)
928
        self.assertEqual(local_provider.get_remove_policy, True)
929
        self.assertEqual(local_provider.get_login_policy, False)
930

    
931
        cl_olduser.logout()
932
        login_data = {'username': 'olduser@grnet.gr', 'password': 'password'}
933
        r = cl_olduser.post('/im/local', login_data, follow=True)
934
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
935

    
936

    
937
class TestAuthProvidersAPI(TestCase):
938
    """
939
    Test auth_providers module API
940
    """
941

    
942
    @im_settings(IM_MODULES=['local', 'shibboleth'])
943
    def test_create(self):
944
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
945
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
946

    
947
        module = 'shibboleth'
948
        identifier = 'SHIB_UUID'
949
        provider_params = {
950
            'affiliation': 'UNIVERSITY',
951
            'info': {'age': 27}
952
        }
953
        provider = auth.get_provider(module, user2, identifier,
954
                                     **provider_params)
955
        provider.add_to_user()
956
        provider = auth.get_provider(module, user, identifier,
957
                                     **provider_params)
958
        provider.add_to_user()
959
        user.email_verified = True
960
        user.save()
961
        self.assertRaises(Exception, provider.add_to_user)
962
        provider = user.get_auth_provider(module, identifier)
963
        self.assertEqual(user.get_auth_provider(
964
            module, identifier)._instance.info.get('age'), 27)
965

    
966
        module = 'local'
967
        identifier = None
968
        provider_params = {'auth_backend': 'ldap', 'info':
969
                          {'office': 'A1'}}
970
        provider = auth.get_provider(module, user, identifier,
971
                                     **provider_params)
972
        provider.add_to_user()
973
        self.assertFalse(provider.get_add_policy)
974
        self.assertRaises(Exception, provider.add_to_user)
975

    
976
        shib = user.get_auth_provider('shibboleth',
977
                                      'SHIB_UUID')
978
        self.assertTrue(shib.get_remove_policy)
979

    
980
        local = user.get_auth_provider('local')
981
        self.assertTrue(local.get_remove_policy)
982

    
983
        local.remove_from_user()
984
        self.assertFalse(shib.get_remove_policy)
985
        self.assertRaises(Exception, shib.remove_from_user)
986

    
987
        provider = user.get_auth_providers()[0]
988
        self.assertRaises(Exception, provider.add_to_user)
989

    
990
    @im_settings(IM_MODULES=['local', 'shibboleth'])
991
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'])
992
    @shibboleth_settings(CREATION_GROUPS_POLICY=['group-create', 'group1',
993
                                                 'group2'])
994
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'])
995
    @localauth_settings(CREATION_GROUPS_POLICY=['localgroup-create',
996
                                                'group-create'])
997
    def test_add_groups(self):
998
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
999
        provider = auth.get_provider('shibboleth', user, 'test123')
1000
        provider.add_to_user()
1001
        user = AstakosUser.objects.get()
1002
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1003
                         sorted([u'group1', u'group2', u'group-create']))
1004

    
1005
        local = auth.get_provider('local', user)
1006
        local.add_to_user()
1007
        provider = user.get_auth_provider('shibboleth')
1008
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1009
        provider.remove_from_user()
1010
        user = AstakosUser.objects.get()
1011
        self.assertEqual(len(user.get_auth_providers()), 1)
1012
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1013
                         sorted([u'group-create', u'localgroup']))
1014

    
1015
        local = user.get_auth_provider('local')
1016
        self.assertRaises(Exception, local.remove_from_user)
1017
        provider = auth.get_provider('shibboleth', user, 'test123')
1018
        provider.add_to_user()
1019
        user = AstakosUser.objects.get()
1020
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1021
                         sorted([u'group-create', u'group1', u'group2',
1022
                                 u'localgroup']))
1023

    
1024
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1025
    def test_policies(self):
1026
        group_old, created = Group.objects.get_or_create(name='olduser')
1027

    
1028
        astakos_settings.MODERATION_ENABLED = True
1029
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1030
            ['academic-user']
1031
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1032
            ['google-user']
1033

    
1034
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
1035
        user.groups.add(group_old)
1036
        user.add_auth_provider('local')
1037

    
1038
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
1039
        user2.add_auth_provider('shibboleth', identifier='shibid')
1040

    
1041
        user3 = AstakosUser.objects.create(email="kpap3@grnet.gr")
1042
        user3.groups.add(group_old)
1043
        user3.add_auth_provider('local')
1044
        user3.add_auth_provider('shibboleth', identifier='1234')
1045

    
1046
        self.assertTrue(user2.groups.get(name='academic-user'))
1047
        self.assertFalse(user2.groups.filter(name='olduser').count())
1048

    
1049
        local = auth_providers.get_provider('local')
1050
        self.assertTrue(local.get_add_policy)
1051

    
1052
        academic_group = Group.objects.get(name='academic-user')
1053
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1054
                                                     academic_group,
1055
                                                     exclusive=True,
1056
                                                     add=False,
1057
                                                     login=False)
1058
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1059
                                                     academic_group,
1060
                                                     exclusive=True,
1061
                                                     login=False,
1062
                                                     add=False)
1063
        # no duplicate entry gets created
1064
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1065

    
1066
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1067
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1068
                                                     user2,
1069
                                                     remove=False)
1070
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1071

    
1072
        local = auth_providers.get_provider('local', user2)
1073
        google = auth_providers.get_provider('google', user2)
1074
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1075
        self.assertTrue(shibboleth.get_login_policy)
1076
        self.assertFalse(shibboleth.get_remove_policy)
1077
        self.assertFalse(local.get_add_policy)
1078
        self.assertFalse(local.get_add_policy)
1079
        self.assertFalse(google.get_add_policy)
1080

    
1081
        user2.groups.remove(Group.objects.get(name='academic-user'))
1082
        self.assertTrue(local.get_add_policy)
1083
        self.assertTrue(google.get_add_policy)
1084
        user2.groups.add(Group.objects.get(name='academic-user'))
1085

    
1086
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1087
                                                     user2,
1088
                                                     exclusive=True,
1089
                                                     add=True)
1090
        self.assertTrue(local.get_add_policy)
1091
        self.assertTrue(google.get_add_policy)
1092

    
1093
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1094
        self.assertFalse(local.get_automoderate_policy)
1095
        self.assertFalse(google.get_automoderate_policy)
1096
        self.assertTrue(shibboleth.get_automoderate_policy)
1097

    
1098
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1099
                  'GOOGLE_ADD_GROUPS_POLICY']:
1100
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1101

    
1102

    
1103
    @shibboleth_settings(CREATE_POLICY=True)
1104
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1105
    def test_create_http(self):
1106
        # this should be wrapped inside a transaction
1107
        user = AstakosUser(email="test@test.com")
1108
        user.save()
1109
        provider = auth_providers.get_provider('shibboleth', user,
1110
                                               'test@academia.test')
1111
        provider.add_to_user()
1112
        user.get_auth_provider('shibboleth', 'test@academia.test')
1113
        provider = auth_providers.get_provider('local', user)
1114
        provider.add_to_user()
1115
        user.get_auth_provider('local')
1116

    
1117
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1118
        user = AstakosUser(email="test2@test.com")
1119
        user.save()
1120
        provider = auth_providers.get_provider('shibboleth', user,
1121
                                               'test@shibboleth.com',
1122
                                               **{'info': {'name':
1123
                                                                'User Test'}})
1124
        self.assertFalse(provider.get_create_policy)
1125
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1126
        self.assertTrue(provider.get_create_policy)
1127
        academic = provider.add_to_user()
1128

    
1129
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1130
    @shibboleth_settings(LIMIT_POLICY=2)
1131
    def test_policies(self):
1132
        user = get_local_user('kpap@grnet.gr')
1133
        user.add_auth_provider('shibboleth', identifier='1234')
1134
        user.add_auth_provider('shibboleth', identifier='12345')
1135

    
1136
        # default limit is 1
1137
        local = user.get_auth_provider('local')
1138
        self.assertEqual(local.get_add_policy, False)
1139

    
1140
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1141
        academic = user.get_auth_provider('shibboleth',
1142
                                          identifier='1234')
1143
        self.assertEqual(academic.get_add_policy, False)
1144
        newacademic = auth_providers.get_provider('shibboleth', user,
1145
                                                  identifier='123456')
1146
        self.assertEqual(newacademic.get_add_policy, True)
1147
        user.add_auth_provider('shibboleth', identifier='123456')
1148
        self.assertEqual(academic.get_add_policy, False)
1149
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1150

    
1151
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1152
    @shibboleth_settings(LIMIT_POLICY=2)
1153
    def test_messages(self):
1154
        user = get_local_user('kpap@grnet.gr')
1155
        user.add_auth_provider('shibboleth', identifier='1234')
1156
        user.add_auth_provider('shibboleth', identifier='12345')
1157
        provider = auth_providers.get_provider('shibboleth')
1158
        self.assertEqual(provider.get_message('title'), 'Academic')
1159
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1160
        # regenerate messages cache
1161
        provider = auth_providers.get_provider('shibboleth')
1162
        self.assertEqual(provider.get_message('title'), 'New title')
1163
        self.assertEqual(provider.get_message('login_title'),
1164
                         'New title LOGIN')
1165
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1166
        self.assertEqual(provider.get_module_icon,
1167
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1168
        self.assertEqual(provider.get_module_medium_icon,
1169
                         settings.MEDIA_URL +
1170
                         'im/auth/icons-medium/shibboleth.png')
1171

    
1172
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1173
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1174
        self.assertEqual(provider.get_method_details_msg,
1175
                         'Account: 12345')
1176
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1177
        self.assertEqual(provider.get_method_details_msg,
1178
                         'Account: 1234')
1179

    
1180
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1181
        self.assertEqual(provider.get_not_active_msg,
1182
                         "'Academic login' is disabled.")
1183

    
1184
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1185
    @shibboleth_settings(LIMIT_POLICY=2)
1186
    def test_templates(self):
1187
        user = get_local_user('kpap@grnet.gr')
1188
        user.add_auth_provider('shibboleth', identifier='1234')
1189
        user.add_auth_provider('shibboleth', identifier='12345')
1190

    
1191
        provider = auth_providers.get_provider('shibboleth')
1192
        self.assertEqual(provider.get_template('login'),
1193
                         'im/auth/shibboleth_login.html')
1194
        provider = auth_providers.get_provider('google')
1195
        self.assertEqual(provider.get_template('login'),
1196
                         'im/auth/generic_login.html')
1197

    
1198

    
1199
class TestProjects(TestCase):
1200
    """
1201
    Test projects.
1202
    """
1203
    def setUp(self):
1204
        # astakos resources
1205
        self.astakos_service = Service.objects.create(name="astakos",
1206
                                                      api_url="/astakos/api/")
1207
        self.resource = Resource.objects.create(name="astakos.pending_app",
1208
                                                uplimit=0,
1209
                                                allow_in_projects=False,
1210
                                                service=self.astakos_service)
1211

    
1212
        # custom service resources
1213
        self.service = Service.objects.create(name="service1",
1214
                                              api_url="http://service.api")
1215
        self.resource = Resource.objects.create(name="service1.resource",
1216
                                                uplimit=100,
1217
                                                service=self.service)
1218
        self.admin = get_local_user("projects-admin@synnefo.org")
1219
        self.admin.uuid = 'uuid1'
1220
        self.admin.save()
1221

    
1222
        self.user = get_local_user("user@synnefo.org")
1223
        self.member = get_local_user("member@synnefo.org")
1224
        self.member2 = get_local_user("member2@synnefo.org")
1225

    
1226
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1227
        self.user_client = get_user_client("user@synnefo.org")
1228
        self.member_client = get_user_client("member@synnefo.org")
1229
        self.member2_client = get_user_client("member2@synnefo.org")
1230

    
1231
        quotas.qh_sync_users(AstakosUser.objects.all())
1232

    
1233
    @im_settings(PROJECT_ADMINS=['uuid1'])
1234
    def test_application_limit(self):
1235
        # user cannot create a project
1236
        r = self.user_client.get(reverse('project_add'), follow=True)
1237
        self.assertRedirects(r, reverse('project_list'))
1238
        self.assertContains(r, "You are not allowed to create a new project")
1239

    
1240
        # but admin can
1241
        r = self.admin_client.get(reverse('project_add'), follow=True)
1242
        self.assertRedirects(r, reverse('project_add'))
1243

    
1244
    @im_settings(PROJECT_ADMINS=['uuid1'])
1245
    def test_allow_in_project(self):
1246
        dfrom = datetime.now()
1247
        dto = datetime.now() + timedelta(days=30)
1248

    
1249
        # astakos.pending_uplimit allow_in_project flag is False
1250
        # we shouldn't be able to create a project application using this
1251
        # resource.
1252
        application_data = {
1253
            'name': 'project.synnefo.org',
1254
            'homepage': 'https://www.synnefo.org',
1255
            'start_date': dfrom.strftime("%Y-%m-%d"),
1256
            'end_date': dto.strftime("%Y-%m-%d"),
1257
            'member_join_policy': 2,
1258
            'member_leave_policy': 1,
1259
            'service1.resource_uplimit': 100,
1260
            'is_selected_service1.resource': "1",
1261
            'astakos.pending_app_uplimit': 100,
1262
            'is_selected_accounts': "1",
1263
            'user': self.user.pk
1264
        }
1265
        form = forms.ProjectApplicationForm(data=application_data)
1266
        # form is invalid
1267
        self.assertEqual(form.is_valid(), False)
1268

    
1269
        del application_data['astakos.pending_app_uplimit']
1270
        del application_data['is_selected_accounts']
1271
        form = forms.ProjectApplicationForm(data=application_data)
1272
        self.assertEqual(form.is_valid(), True)
1273

    
1274
    @im_settings(PROJECT_ADMINS=['uuid1'])
1275
    def test_applications(self):
1276
        # let user have 2 pending applications
1277
        self.user.add_resource_policy('astakos.pending_app', 2)
1278
        quotas.qh_sync_users(AstakosUser.objects.all())
1279

    
1280
        r = self.user_client.get(reverse('project_add'), follow=True)
1281
        self.assertRedirects(r, reverse('project_add'))
1282

    
1283
        # user fills the project application form
1284
        post_url = reverse('project_add') + '?verify=1'
1285
        dfrom = datetime.now()
1286
        dto = datetime.now() + timedelta(days=30)
1287
        application_data = {
1288
            'name': 'project.synnefo.org',
1289
            'homepage': 'https://www.synnefo.org',
1290
            'start_date': dfrom.strftime("%Y-%m-%d"),
1291
            'end_date': dto.strftime("%Y-%m-%d"),
1292
            'member_join_policy': 2,
1293
            'member_leave_policy': 1,
1294
            'service1.resource_uplimit': 100,
1295
            'is_selected_service1.resource': "1",
1296
            'user': self.user.pk
1297
        }
1298
        r = self.user_client.post(post_url, data=application_data, follow=True)
1299
        self.assertEqual(r.status_code, 200)
1300
        self.assertEqual(r.context['form'].is_valid(), True)
1301

    
1302
        # confirm request
1303
        post_url = reverse('project_add') + '?verify=0&edit=0'
1304
        r = self.user_client.post(post_url, data=application_data, follow=True)
1305
        self.assertContains(r, "The project application has been received")
1306
        self.assertRedirects(r, reverse('project_list'))
1307
        self.assertEqual(ProjectApplication.objects.count(), 1)
1308
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1309

    
1310
        # create another one
1311
        application_data['name'] = 'project2.synnefo.org'
1312
        r = self.user_client.post(post_url, data=application_data, follow=True)
1313
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1314

    
1315
        # no more applications (LIMIT is 2)
1316
        r = self.user_client.get(reverse('project_add'), follow=True)
1317
        self.assertRedirects(r, reverse('project_list'))
1318
        self.assertContains(r, "You are not allowed to create a new project")
1319

    
1320
        # login
1321
        self.admin_client.get(reverse("edit_profile"))
1322
        # admin approves
1323
        r = self.admin_client.post(reverse('project_app_approve',
1324
                                           kwargs={'application_id': app1_id}),
1325
                                   follow=True)
1326
        self.assertEqual(r.status_code, 200)
1327

    
1328
        # project created
1329
        self.assertEqual(Project.objects.count(), 1)
1330

    
1331
        # login
1332
        self.member_client.get(reverse("edit_profile"))
1333
        # cannot join app2 (not approved yet)
1334
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1335
        r = self.member_client.post(join_url, follow=True)
1336
        self.assertEqual(r.status_code, 403)
1337

    
1338
        # can join app1
1339
        self.member_client.get(reverse("edit_profile"))
1340
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1341
        r = self.member_client.post(join_url, follow=True)
1342
        self.assertEqual(r.status_code, 200)
1343

    
1344
        self.assertEqual(ProjectMembership.objects.count(), 1)
1345

    
1346
        reject_member_url = reverse('project_reject_member',
1347
                                    kwargs={'chain_id': app1_id, 'memb_id':
1348
                                            self.member.pk})
1349
        accept_member_url = reverse('project_accept_member',
1350
                                    kwargs={'chain_id': app1_id, 'memb_id':
1351
                                            self.member.pk})
1352

    
1353
        # only project owner is allowed to reject
1354
        r = self.member_client.post(reject_member_url, follow=True)
1355
        self.assertContains(r, "You do not have the permissions")
1356
        self.assertEqual(r.status_code, 200)
1357

    
1358
        # user (owns project) rejects membership
1359
        r = self.user_client.post(reject_member_url, follow=True)
1360
        self.assertEqual(ProjectMembership.objects.count(), 0)
1361

    
1362
        # user rejoins
1363
        self.member_client.get(reverse("edit_profile"))
1364
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1365
        r = self.member_client.post(join_url, follow=True)
1366
        self.assertEqual(r.status_code, 200)
1367
        self.assertEqual(ProjectMembership.objects.count(), 1)
1368

    
1369
        # user (owns project) accepts membership
1370
        r = self.user_client.post(accept_member_url, follow=True)
1371
        self.assertEqual(ProjectMembership.objects.count(), 1)
1372
        membership = ProjectMembership.objects.get()
1373
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1374

    
1375
        user_quotas = quotas.get_users_quotas([self.member])
1376
        resource = 'service1.resource'
1377
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1378
        # 100 from initial uplimit + 100 from project
1379
        self.assertEqual(newlimit, 200)
1380

    
1381
        remove_member_url = reverse('project_remove_member',
1382
                                    kwargs={'chain_id': app1_id, 'memb_id':
1383
                                            self.member.pk})
1384
        r = self.user_client.post(remove_member_url, follow=True)
1385
        self.assertEqual(r.status_code, 200)
1386

    
1387
        user_quotas = quotas.get_users_quotas([self.member])
1388
        resource = 'service1.resource'
1389
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1390
        # 200 - 100 from project
1391
        self.assertEqual(newlimit, 100)