Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ 2a88057d

History | View | Annotate | Download (59.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings
40

    
41
from django.test import TestCase, Client
42
from django.core import mail
43
from django.http import SimpleCookie, HttpRequest, QueryDict
44
from django.utils.importlib import import_module
45

    
46
from astakos.im.activation_backends import *
47
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
48
from astakos.im.models import *
49
from astakos.im import functions
50
from astakos.im import settings as astakos_settings
51
from astakos.im import forms
52

    
53
from urllib import quote
54
from datetime import timedelta
55

    
56
from astakos.im import messages
57
from astakos.im import auth_providers
58
from astakos.im import quotas
59

    
60
from django.conf import settings
61

    
62

    
63
# set some common settings
64
astakos_settings.EMAILCHANGE_ENABLED = True
65
astakos_settings.RECAPTCHA_ENABLED = False
66

    
67
settings.LOGGING_SETUP['disable_existing_loggers'] = False
68

    
69
# shortcut decorators to override provider settings
70
# e.g. shibboleth_settings(ENABLED=True) will set
71
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
72
prefixes = {'providers': 'AUTH_PROVIDER_',
73
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
74
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
75
im_settings = functools.partial(with_settings, astakos_settings)
76
shibboleth_settings = functools.partial(with_settings,
77
                                        settings,
78
                                        prefix=prefixes['shibboleth'])
79
localauth_settings = functools.partial(with_settings, settings,
80
                                       prefix=prefixes['local'])
81

    
82

    
83
class AstakosTestClient(Client):
84
    pass
85

    
86

    
87
class ShibbolethClient(AstakosTestClient):
88
    """
89
    A shibboleth agnostic client.
90
    """
91
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
92
                          dir(ShibbolethTokens))
93

    
94
    def __init__(self, *args, **kwargs):
95
        self.tokens = kwargs.pop('tokens', {})
96
        super(ShibbolethClient, self).__init__(*args, **kwargs)
97

    
98
    def set_tokens(self, **kwargs):
99
        for key, value in kwargs.iteritems():
100
            key = 'SHIB_%s' % key.upper()
101
            if not key in self.VALID_TOKENS:
102
                raise Exception('Invalid shibboleth token')
103

    
104
            self.tokens[key] = value
105

    
106
    def unset_tokens(self, *keys):
107
        for key in keys:
108
            key = 'SHIB_%s' % param.upper()
109
            if key in self.tokens:
110
                del self.tokens[key]
111

    
112
    def reset_tokens(self):
113
        self.tokens = {}
114

    
115
    def get_http_token(self, key):
116
        http_header = getattr(ShibbolethTokens, key)
117
        return http_header
118

    
119
    def request(self, **request):
120
        """
121
        Transform valid shibboleth tokens to http headers
122
        """
123
        for token, value in self.tokens.iteritems():
124
            request[self.get_http_token(token)] = value
125

    
126
        for param in request.keys():
127
            key = 'SHIB_%s' % param.upper()
128
            if key in self.VALID_TOKENS:
129
                request[self.get_http_token(key)] = request[param]
130
                del request[param]
131

    
132
        return super(ShibbolethClient, self).request(**request)
133

    
134

    
135
def get_user_client(username, password="password"):
136
    client = Client()
137
    client.login(username=username, password=password)
138
    return client
139

    
140

    
141
def get_local_user(username, **kwargs):
142
        try:
143
            return AstakosUser.objects.get(email=username)
144
        except:
145
            user_params = {
146
                'username': username,
147
                'email': username,
148
                'is_active': True,
149
                'activation_sent': datetime.now(),
150
                'email_verified': True,
151
                'provider': 'local'
152
            }
153
            user_params.update(kwargs)
154
            user = AstakosUser(**user_params)
155
            user.set_password(kwargs.get('password', 'password'))
156
            user.save()
157
            user.add_auth_provider('local', auth_backend='astakos')
158
            if kwargs.get('is_active', True):
159
                user.is_active = True
160
            else:
161
                user.is_active = False
162
            user.save()
163
            return user
164

    
165

    
166
def get_mailbox(email):
167
    mails = []
168
    for sent_email in mail.outbox:
169
        for recipient in sent_email.recipients():
170
            if email in recipient:
171
                mails.append(sent_email)
172
    return mails
173

    
174

    
175
class ShibbolethTests(TestCase):
176
    """
177
    Testing shibboleth authentication.
178
    """
179

    
180
    fixtures = ['groups']
181

    
182
    def setUp(self):
183
        self.client = ShibbolethClient()
184
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
185
        astakos_settings.MODERATION_ENABLED = True
186

    
187
    @im_settings(FORCE_PROFILE_UPDATE=False)
188
    def test_create_account(self):
189

    
190
        client = ShibbolethClient()
191

    
192
        # shibboleth views validation
193
        # eepn required
194
        r = client.get('/im/login/shibboleth?', follow=True)
195
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
196
            'domain': astakos_settings.BASEURL,
197
            'contact_email': settings.CONTACT_EMAIL
198
        })
199
        client.set_tokens(eppn="kpapeppn")
200

    
201
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
202
        # shibboleth user info required
203
        r = client.get('/im/login/shibboleth?', follow=True)
204
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
205
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
206

    
207
        # shibboleth logged us in
208
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
209
                          cn="Kostas Papadimitriou",
210
                          ep_affiliation="Test Affiliation")
211
        r = client.get('/im/login/shibboleth?', follow=True)
212
        token = PendingThirdPartyUser.objects.get().token
213
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
214
        self.assertEqual(r.status_code, 200)
215

    
216
        # a new pending user created
217
        pending_user = PendingThirdPartyUser.objects.get(
218
            third_party_identifier="kpapeppn")
219
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
220
        # keep the token for future use
221
        token = pending_user.token
222
        # from now on no shibboleth headers are sent to the server
223
        client.reset_tokens()
224

    
225
        # this is the old way, it should fail, to avoid pending user take over
226
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
227
        self.assertEqual(r.status_code, 404)
228

    
229
        # this is the signup unique url associated with the pending user
230
        # created
231
        r = client.get('/im/signup/?third_party_token=%s' % token)
232
        identifier = pending_user.third_party_identifier
233
        post_data = {'third_party_identifier': identifier,
234
                     'first_name': 'Kostas',
235
                     'third_party_token': token,
236
                     'last_name': 'Mitroglou',
237
                     'provider': 'shibboleth'}
238

    
239
        signup_url = reverse('signup')
240

    
241
        # invlid email
242
        post_data['email'] = 'kpap'
243
        r = client.post(signup_url, post_data)
244
        self.assertContains(r, token)
245

    
246
        # existing email
247
        existing_user = get_local_user('test@test.com')
248
        post_data['email'] = 'test@test.com'
249
        r = client.post(signup_url, post_data)
250
        self.assertContains(r, messages.EMAIL_USED)
251
        existing_user.delete()
252

    
253
        # and finally a valid signup
254
        post_data['email'] = 'kpap@grnet.gr'
255
        r = client.post(signup_url, post_data, follow=True)
256
        self.assertContains(r, messages.NOTIFICATION_SENT)
257

    
258
        # everything is ok in our db
259
        self.assertEqual(AstakosUser.objects.count(), 1)
260
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
261
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
262

    
263
        # provider info stored
264
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
265
        self.assertEqual(provider.affiliation, 'Test Affiliation')
266
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
267
                                         u'eppn': u'kpapeppn',
268
                                         u'name': u'Kostas Papadimitriou'})
269

    
270
        # lets login (not activated yet)
271
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
272
                          cn="Kostas Papadimitriou", )
273
        r = client.get("/im/login/shibboleth?", follow=True)
274
        self.assertContains(r, 'is pending moderation')
275

    
276
        # admin activates our user
277
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
278
        functions.activate(u)
279
        self.assertEqual(u.is_active, True)
280

    
281
        # we see our profile
282
        r = client.get("/im/login/shibboleth?", follow=True)
283
        self.assertRedirects(r, '/im/landing')
284
        self.assertEqual(r.status_code, 200)
285

    
286
    def test_existing(self):
287
        """
288
        Test adding of third party login to an existing account
289
        """
290

    
291
        # this is our existing user
292
        existing_user = get_local_user('kpap@grnet.gr')
293
        existing_inactive = get_local_user('kpap-inactive@grnet.gr')
294
        existing_inactive.is_active = False
295
        existing_inactive.save()
296

    
297
        existing_unverified = get_local_user('kpap-unverified@grnet.gr')
298
        existing_unverified.is_active = False
299
        existing_unverified.activation_sent = None
300
        existing_unverified.email_verified = False
301
        existing_unverified.is_verified = False
302
        existing_unverified.save()
303

    
304
        client = ShibbolethClient()
305
        # shibboleth logged us in, notice that we use different email
306
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
307
                          cn="Kostas Papadimitriou", )
308
        r = client.get("/im/login/shibboleth?", follow=True)
309

    
310
        # a new pending user created
311
        pending_user = PendingThirdPartyUser.objects.get()
312
        token = pending_user.token
313
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
314
        pending_key = pending_user.token
315
        client.reset_tokens()
316
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
317

    
318
        form = r.context['login_form']
319
        signupdata = copy.copy(form.initial)
320
        signupdata['email'] = 'kpap@grnet.gr'
321
        signupdata['third_party_token'] = token
322
        signupdata['provider'] = 'shibboleth'
323
        signupdata.pop('id', None)
324

    
325
        # the email exists to another user
326
        r = client.post("/im/signup", signupdata)
327
        self.assertContains(r, "There is already an account with this email "
328
                               "address")
329
        # change the case, still cannot create
330
        signupdata['email'] = 'KPAP@grnet.GR'
331
        r = client.post("/im/signup", signupdata)
332
        self.assertContains(r, "There is already an account with this email "
333
                               "address")
334
        # inactive user
335
        signupdata['email'] = 'KPAP-inactive@grnet.GR'
336
        r = client.post("/im/signup", signupdata)
337
        self.assertContains(r, "There is already an account with this email "
338
                               "address")
339

    
340
        # unverified user, this should pass, old entry will be deleted
341
        signupdata['email'] = 'KAPAP-unverified@grnet.GR'
342
        r = client.post("/im/signup", signupdata)
343

    
344
        post_data = {'password': 'password',
345
                     'username': 'kpap@grnet.gr'}
346
        r = client.post('/im/local', post_data, follow=True)
347
        self.assertTrue(r.context['request'].user.is_authenticated())
348
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
349
                          cn="Kostas Papadimitriou", )
350
        r = client.get("/im/login/shibboleth?", follow=True)
351
        self.assertContains(r, "enabled for this account")
352
        client.reset_tokens()
353

    
354
        user = existing_user
355
        self.assertTrue(user.has_auth_provider('shibboleth'))
356
        self.assertTrue(user.has_auth_provider('local',
357
                                               auth_backend='astakos'))
358
        client.logout()
359

    
360
        # look Ma, i can login with both my shibboleth and local account
361
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
362
                          cn="Kostas Papadimitriou")
363
        r = client.get("/im/login/shibboleth?", follow=True)
364
        self.assertTrue(r.context['request'].user.is_authenticated())
365
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
366
        self.assertRedirects(r, '/im/landing')
367
        self.assertEqual(r.status_code, 200)
368
        client.logout()
369
        client.reset_tokens()
370

    
371
        # logged out
372
        r = client.get("/im/profile", follow=True)
373
        self.assertFalse(r.context['request'].user.is_authenticated())
374

    
375
        # login with local account also works
376
        post_data = {'password': 'password',
377
                     'username': 'kpap@grnet.gr'}
378
        r = self.client.post('/im/local', post_data, follow=True)
379
        self.assertTrue(r.context['request'].user.is_authenticated())
380
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
381
        self.assertRedirects(r, '/im/landing')
382
        self.assertEqual(r.status_code, 200)
383

    
384
        # cannot add the same eppn
385
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
386
                          cn="Kostas Papadimitriou", )
387
        r = client.get("/im/login/shibboleth?", follow=True)
388
        self.assertRedirects(r, '/im/landing')
389
        self.assertTrue(r.status_code, 200)
390
        self.assertEquals(existing_user.auth_providers.count(), 2)
391

    
392
        # only one allowed by default
393
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
394
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
395
        prov = auth_providers.get_provider('shibboleth')
396
        r = client.get("/im/login/shibboleth?", follow=True)
397
        self.assertContains(r, "Failed to add")
398
        self.assertRedirects(r, '/im/profile')
399
        self.assertTrue(r.status_code, 200)
400
        self.assertEquals(existing_user.auth_providers.count(), 2)
401
        client.logout()
402
        client.reset_tokens()
403

    
404
        # cannot login with another eppn
405
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
406
                          cn="Kostas Papadimitriou")
407
        r = client.get("/im/login/shibboleth?", follow=True)
408
        self.assertFalse(r.context['request'].user.is_authenticated())
409

    
410
        # cannot
411

    
412
        # lets remove local password
413
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
414
                                       email="kpap@grnet.gr")
415
        remove_local_url = user.get_auth_provider('local').get_remove_url
416
        remove_shibbo_url = user.get_auth_provider('shibboleth',
417
                                                   'kpapeppn').get_remove_url
418
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
419
                          cn="Kostas Papadimtriou")
420
        r = client.get("/im/login/shibboleth?", follow=True)
421
        client.reset_tokens()
422

    
423
        # TODO: this view should use POST
424
        r = client.get(remove_local_url)
425
        # 2 providers left
426
        self.assertEqual(user.auth_providers.count(), 1)
427
        # cannot remove last provider
428
        r = client.get(remove_shibbo_url)
429
        self.assertEqual(r.status_code, 403)
430
        self.client.logout()
431

    
432
        # cannot login using local credentials (notice we use another client)
433
        post_data = {'password': 'password',
434
                     'username': 'kpap@grnet.gr'}
435
        r = self.client.post('/im/local', post_data, follow=True)
436
        self.assertFalse(r.context['request'].user.is_authenticated())
437

    
438
        # we can reenable the local provider by setting a password
439
        r = client.get("/im/password_change", follow=True)
440
        r = client.post("/im/password_change", {'new_password1': '111',
441
                                                'new_password2': '111'},
442
                        follow=True)
443
        user = r.context['request'].user
444
        self.assertTrue(user.has_auth_provider('local'))
445
        self.assertTrue(user.has_auth_provider('shibboleth'))
446
        self.assertTrue(user.check_password('111'))
447
        self.assertTrue(user.has_usable_password())
448
        self.client.logout()
449

    
450
        # now we can login
451
        post_data = {'password': '111',
452
                     'username': 'kpap@grnet.gr'}
453
        r = self.client.post('/im/local', post_data, follow=True)
454
        self.assertTrue(r.context['request'].user.is_authenticated())
455

    
456
        client.reset_tokens()
457

    
458
        # we cannot take over another shibboleth identifier
459
        user2 = get_local_user('another@grnet.gr')
460
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
461
        # login
462
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
463
                          cn="Kostas Papadimitriou")
464
        r = client.get("/im/login/shibboleth?", follow=True)
465
        # try to assign existing shibboleth identifier of another user
466
        client.set_tokens(mail="kpap_second@shibboleth.gr",
467
                          eppn="existingeppn", cn="Kostas Papadimitriou")
468
        r = client.get("/im/login/shibboleth?", follow=True)
469
        self.assertContains(r, "this account is already assigned")
470

    
471

    
472
class TestLocal(TestCase):
473

    
474
    fixtures = ['groups']
475

    
476
    def setUp(self):
477
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
478
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
479
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
480
        settings.ASTAKOS_MODERATION_ENABLED = True
481

    
482
    def tearDown(self):
483
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
484

    
485
    def test_no_moderation(self):
486
        # disable moderation
487
        astakos_settings.MODERATION_ENABLED = False
488

    
489
        # create a new user
490
        r = self.client.get("/im/signup")
491
        self.assertEqual(r.status_code, 200)
492
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
493
                'password2': 'password', 'first_name': 'Kostas',
494
                'last_name': 'Mitroglou', 'provider': 'local'}
495
        r = self.client.post("/im/signup", data)
496

    
497
        # user created
498
        self.assertEqual(AstakosUser.objects.count(), 1)
499
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
500
                                       email="kpap@grnet.gr")
501
        self.assertEqual(user.username, 'kpap@grnet.gr')
502
        self.assertEqual(user.has_auth_provider('local'), True)
503
        self.assertFalse(user.is_active)
504

    
505
        # user (but not admin) gets notified
506
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
507
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
508
        astakos_settings.MODERATION_ENABLED = True
509

    
510
    def test_email_case(self):
511
        data = {
512
            'email': 'kPap@grnet.gr',
513
            'password1': '1234',
514
            'password2': '1234'
515
        }
516

    
517
        form = forms.LocalUserCreationForm(data)
518
        self.assertTrue(form.is_valid())
519
        user = form.save()
520
        form.store_user(user, {})
521

    
522
        u = AstakosUser.objects.get()
523
        self.assertEqual(u.email, 'kPap@grnet.gr')
524
        self.assertEqual(u.username, 'kpap@grnet.gr')
525
        u.is_active = True
526
        u.email_verified = True
527
        u.save()
528

    
529
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
530
        login = forms.LoginForm(data=data)
531
        self.assertTrue(login.is_valid())
532

    
533
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
534
        login = forms.LoginForm(data=data)
535
        self.assertTrue(login.is_valid())
536

    
537
        data = {
538
            'email': 'kpap@grnet.gr',
539
            'password1': '1234',
540
            'password2': '1234'
541
        }
542
        form = forms.LocalUserCreationForm(data)
543
        self.assertFalse(form.is_valid())
544

    
545
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),))
546
    @im_settings(FORCE_PROFILE_UPDATE=False)
547
    def test_local_provider(self):
548
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
549
        # enable moderation
550
        astakos_settings.MODERATION_ENABLED = True
551

    
552
        # create a user
553
        r = self.client.get("/im/signup")
554
        self.assertEqual(r.status_code, 200)
555
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
556
                'password2': 'password', 'first_name': 'Kostas',
557
                'last_name': 'Mitroglou', 'provider': 'local'}
558
        r = self.client.post("/im/signup", data)
559

    
560
        # user created
561
        self.assertEqual(AstakosUser.objects.count(), 1)
562
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
563
                                       email="kpap@grnet.gr")
564
        self.assertEqual(user.username, 'kpap@grnet.gr')
565
        self.assertEqual(user.has_auth_provider('local'), True)
566
        self.assertFalse(user.is_active)  # not activated
567
        self.assertFalse(user.email_verified)  # not verified
568
        self.assertFalse(user.activation_sent)  # activation automatically sent
569

    
570
        # admin gets notified and activates the user from the command line
571
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
572
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
573
                                           'password': 'password'})
574
        self.assertContains(r, messages.NOTIFICATION_SENT)
575
        functions.send_activation(user)
576

    
577
        # user activation fields updated and user gets notified via email
578
        user = AstakosUser.objects.get(pk=user.pk)
579
        self.assertTrue(user.activation_sent)
580
        self.assertFalse(user.email_verified)
581
        self.assertFalse(user.is_active)
582
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
583

    
584
        # user forgot she got registered and tries to submit registration
585
        # form. Notice the upper case in email
586
        data = {'email': 'KPAP@grnet.gr', 'password1': 'password',
587
                'password2': 'password', 'first_name': 'Kostas',
588
                'last_name': 'Mitroglou', 'provider': 'local'}
589
        r = self.client.post("/im/signup", data, follow=True)
590
        self.assertRedirects(r, reverse('index'))
591
        self.assertContains(r, messages.NOTIFICATION_SENT)
592

    
593
        user = AstakosUser.objects.get()
594
        functions.send_activation(user)
595

    
596
        # previous user replaced
597
        self.assertTrue(user.activation_sent)
598
        self.assertFalse(user.email_verified)
599
        self.assertFalse(user.is_active)
600
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 1)
601

    
602
        # hmmm, email exists; lets request a password change
603
        r = self.client.get('/im/local/password_reset')
604
        self.assertEqual(r.status_code, 200)
605
        data = {'email': 'kpap@grnet.gr'}
606
        r = self.client.post('/im/local/password_reset', data, follow=True)
607
        # she can't because account is not active yet
608
        self.assertContains(r, 'pending activation')
609

    
610
        # moderation is enabled and an activation email has already been sent
611
        # so user can trigger resend of the activation email
612
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
613
        self.assertContains(r, 'has been sent to your email address.')
614
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 2)
615

    
616
        # also she cannot login
617
        data = {'username': 'kpap@grnet.gr', 'password': 'password'}
618
        r = self.client.post('/im/local', data, follow=True)
619
        self.assertContains(r, 'Resend activation')
620
        self.assertFalse(r.context['request'].user.is_authenticated())
621
        self.assertFalse('_pithos2_a' in self.client.cookies)
622

    
623
        # user sees the message and resends activation
624
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
625
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 3)
626

    
627
        # switch back moderation setting
628
        astakos_settings.MODERATION_ENABLED = True
629
        r = self.client.get(user.get_activation_url(), follow=True)
630
        self.assertRedirects(r, "/im/landing")
631
        r = self.client.get('/im/profile', follow=True)
632
        self.assertTrue(r.context['request'].user.is_authenticated())
633
        self.assertTrue('_pithos2_a' in self.client.cookies)
634
        self.assertContains(r, "KPAP@grnet.gr")
635
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 4)
636

    
637
        user = AstakosUser.objects.get(pk=user.pk)
638
        # user activated and logged in, token cookie set
639
        self.assertTrue(r.context['request'].user.is_authenticated())
640
        self.assertTrue('_pithos2_a' in self.client.cookies)
641
        cookies = self.client.cookies
642
        self.assertTrue(quote(user.auth_token) in
643
                        cookies.get('_pithos2_a').value)
644
        r = self.client.get('/im/logout', follow=True)
645
        r = self.client.get('/im/')
646
        # user logged out, token cookie removed
647
        self.assertFalse(r.context['request'].user.is_authenticated())
648
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
649

    
650
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
651
        del self.client.cookies['_pithos2_a']
652

    
653
        # user can login
654
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
655
                                           'password': 'password'},
656
                             follow=True)
657
        self.assertTrue(r.context['request'].user.is_authenticated())
658
        self.assertTrue('_pithos2_a' in self.client.cookies)
659
        cookies = self.client.cookies
660
        self.assertTrue(quote(user.auth_token) in
661
                        cookies.get('_pithos2_a').value)
662
        self.client.get('/im/logout', follow=True)
663

    
664
        # user forgot password
665
        old_pass = user.password
666
        r = self.client.get('/im/local/password_reset')
667
        self.assertEqual(r.status_code, 200)
668
        r = self.client.post('/im/local/password_reset', {'email':
669
                                                          'kpap@grnet.gr'})
670
        self.assertEqual(r.status_code, 302)
671
        # email sent
672
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 5)
673

    
674
        # user visits change password link
675
        r = self.client.get(user.get_password_reset_url())
676
        r = self.client.post(user.get_password_reset_url(),
677
                             {'new_password1': 'newpass',
678
                              'new_password2': 'newpass'})
679

    
680
        user = AstakosUser.objects.get(pk=user.pk)
681
        self.assertNotEqual(old_pass, user.password)
682

    
683
        # old pass is not usable
684
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
685
                                           'password': 'password'})
686
        self.assertContains(r, 'Please enter a correct username and password')
687
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
688
                                           'password': 'newpass'},
689
                             follow=True)
690
        self.assertTrue(r.context['request'].user.is_authenticated())
691
        self.client.logout()
692

    
693
        # tests of special local backends
694
        user = AstakosUser.objects.get(pk=user.pk)
695
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
696
        user.save()
697

    
698
        # non astakos local backends do not support password reset
699
        r = self.client.get('/im/local/password_reset')
700
        self.assertEqual(r.status_code, 200)
701
        r = self.client.post('/im/local/password_reset', {'email':
702
                                                          'kpap@grnet.gr'})
703
        # she can't because account is not active yet
704
        self.assertContains(r, "Changing password is not")
705

    
706

    
707
class UserActionsTests(TestCase):
708

    
709
    def test_email_change(self):
710
        # to test existing email validation
711
        get_local_user('existing@grnet.gr')
712

    
713
        # local user
714
        user = get_local_user('kpap@grnet.gr')
715

    
716
        # login as kpap
717
        self.client.login(username='kpap@grnet.gr', password='password')
718
        r = self.client.get('/im/profile', follow=True)
719
        user = r.context['request'].user
720
        self.assertTrue(user.is_authenticated())
721

    
722
        # change email is enabled
723
        r = self.client.get('/im/email_change')
724
        self.assertEqual(r.status_code, 200)
725
        self.assertFalse(user.email_change_is_pending())
726

    
727
        # request email change to an existing email fails
728
        data = {'new_email_address': 'existing@grnet.gr'}
729
        r = self.client.post('/im/email_change', data)
730
        self.assertContains(r, messages.EMAIL_USED)
731

    
732
        # proper email change
733
        data = {'new_email_address': 'kpap@gmail.com'}
734
        r = self.client.post('/im/email_change', data, follow=True)
735
        self.assertRedirects(r, '/im/profile')
736
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
737
        change1 = EmailChange.objects.get()
738

    
739
        # user sees a warning
740
        r = self.client.get('/im/email_change')
741
        self.assertEqual(r.status_code, 200)
742
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
743
        self.assertTrue(user.email_change_is_pending())
744

    
745
        # link was sent
746
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
747
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
748

    
749
        # proper email change
750
        data = {'new_email_address': 'kpap@yahoo.com'}
751
        r = self.client.post('/im/email_change', data, follow=True)
752
        self.assertRedirects(r, '/im/profile')
753
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
754
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
755
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
756
        change2 = EmailChange.objects.get()
757

    
758
        r = self.client.get(change1.get_url())
759
        self.assertEquals(r.status_code, 302)
760
        self.client.logout()
761

    
762
        r = self.client.post('/im/local?next=' + change2.get_url(),
763
                             {'username': 'kpap@grnet.gr',
764
                              'password': 'password',
765
                              'next': change2.get_url()},
766
                             follow=True)
767
        self.assertRedirects(r, '/im/profile')
768
        user = r.context['request'].user
769
        self.assertEquals(user.email, 'kpap@yahoo.com')
770
        self.assertEquals(user.username, 'kpap@yahoo.com')
771

    
772
        self.client.logout()
773
        r = self.client.post('/im/local?next=' + change2.get_url(),
774
                             {'username': 'kpap@grnet.gr',
775
                              'password': 'password',
776
                              'next': change2.get_url()},
777
                             follow=True)
778
        self.assertContains(r, "Please enter a correct username and password")
779
        self.assertEqual(user.emailchanges.count(), 0)
780

    
781

    
782
class TestAuthProviderViews(TestCase):
783

    
784
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'])
785
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
786
    @im_settings(IM_MODULES=['shibboleth', 'local'])
787
    @im_settings(MODERATION_ENABLED=True)
788
    @im_settings(FORCE_PROFILE_UPDATE=False)
789
    def test_user(self):
790
        Profile = AuthProviderPolicyProfile
791
        Pending = PendingThirdPartyUser
792
        User = AstakosUser
793

    
794
        User.objects.create(email="newuser@grnet.gr")
795
        get_local_user("olduser@grnet.gr")
796
        cl_olduser = ShibbolethClient()
797
        get_local_user("olduser2@grnet.gr")
798
        ShibbolethClient()
799
        cl_newuser = ShibbolethClient()
800
        cl_newuser2 = Client()
801

    
802
        academic_group, created = Group.objects.get_or_create(
803
            name='academic-login')
804
        academic_users = academic_group.user_set
805
        assert created
806
        policy_only_academic = Profile.objects.add_policy('academic_strict',
807
                                                          'shibboleth',
808
                                                          academic_group,
809
                                                          exclusive=True,
810
                                                          login=False,
811
                                                          add=False)
812

    
813

    
814
        # new academic user
815
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
816
        cl_newuser.set_tokens(eppn="newusereppn")
817
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
818
        pending = Pending.objects.get()
819
        identifier = pending.third_party_identifier
820
        signup_data = {'third_party_identifier': identifier,
821
                       'first_name': 'Academic',
822
                       'third_party_token': pending.token,
823
                       'last_name': 'New User',
824
                       'provider': 'shibboleth'}
825
        r = cl_newuser.post('/im/signup', signup_data)
826
        self.assertContains(r, "This field is required", )
827
        signup_data['email'] = 'olduser@grnet.gr'
828
        r = cl_newuser.post('/im/signup', signup_data)
829
        self.assertContains(r, "already an account with this email", )
830
        signup_data['email'] = 'newuser@grnet.gr'
831
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
832
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
833
        self.assertEqual(r.status_code, 404)
834
        newuser = User.objects.get(email="newuser@grnet.gr")
835
        activation_link = newuser.get_activation_url()
836
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
837

    
838
        # new non-academic user
839
        signup_data = {'first_name': 'Non Academic',
840
                       'last_name': 'New User',
841
                       'provider': 'local',
842
                       'password1': 'password',
843
                       'password2': 'password'}
844
        signup_data['email'] = 'olduser@grnet.gr'
845
        r = cl_newuser2.post('/im/signup', signup_data)
846
        self.assertContains(r, 'There is already an account with this '
847
                               'email address')
848
        signup_data['email'] = 'newuser@grnet.gr'
849
        r = cl_newuser2.post('/im/signup/', signup_data)
850
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
851
        r = self.client.get(activation_link, follow=True)
852
        self.assertEqual(r.status_code, 400)
853
        newuser = User.objects.get(email="newuser@grnet.gr")
854
        self.assertFalse(newuser.activation_sent)
855
        r = self.client.get(newuser.get_activation_url(), follow=True)
856
        self.assertContains(r, "pending moderation")
857

    
858
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
859
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
860
        pending = Pending.objects.get()
861
        identifier = pending.third_party_identifier
862
        signup_data = {'third_party_identifier': identifier,
863
                       'first_name': 'Academic',
864
                       'third_party_token': pending.token,
865
                       'last_name': 'New User',
866
                       'provider': 'shibboleth'}
867
        signup_data['email'] = 'newuser@grnet.gr'
868
        r = cl_newuser.post('/im/signup', signup_data)
869
        newuser = User.objects.get(email="newuser@grnet.gr")
870
        self.assertTrue(newuser.activation_sent)
871
        activation_link = newuser.get_activation_url()
872
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
873
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
874
        self.assertRedirects(r, '/im/landing')
875
        newuser = User.objects.get(email="newuser@grnet.gr")
876
        self.assertEqual(newuser.is_active, True)
877
        self.assertEqual(newuser.email_verified, True)
878
        cl_newuser.logout()
879

    
880
        # cannot reactivate if suspended
881
        newuser.is_active = False
882
        newuser.save()
883
        r = cl_newuser.get(newuser.get_activation_url())
884
        newuser = User.objects.get(email="newuser@grnet.gr")
885
        self.assertFalse(newuser.is_active)
886

    
887
        # release suspension
888
        newuser.is_active = True
889
        newuser.save()
890

    
891
        cl_newuser.get('/im/login/shibboleth?', follow=True)
892
        local = auth.get_provider('local', newuser)
893
        self.assertEqual(local.get_add_policy, False)
894
        self.assertEqual(local.get_login_policy, False)
895
        r = cl_newuser.get(local.get_add_url, follow=True)
896
        self.assertRedirects(r, '/im/profile')
897
        self.assertContains(r, 'disabled for your')
898

    
899
        cl_olduser.login(username='olduser@grnet.gr', password="password")
900
        r = cl_olduser.get('/im/profile', follow=True)
901
        self.assertEqual(r.status_code, 200)
902
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
903
        self.assertContains(r, 'Your request is missing a unique token')
904
        cl_olduser.set_tokens(eppn="newusereppn")
905
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
906
        self.assertContains(r, 'is already assigned to another user')
907
        cl_olduser.set_tokens(eppn="oldusereppn")
908
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
909
        self.assertContains(r, 'Academic login enabled for this account')
910

    
911
        user = User.objects.get(email="olduser@grnet.gr")
912
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
913
        local_provider = user.get_auth_provider('local')
914
        self.assertEqual(shib_provider.get_remove_policy, True)
915
        self.assertEqual(local_provider.get_remove_policy, True)
916

    
917

    
918
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
919
                                                          'shibboleth',
920
                                                          academic_group,
921
                                                          remove=False)
922
        user.groups.add(academic_group)
923
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
924
        local_provider = user.get_auth_provider('local')
925
        self.assertEqual(shib_provider.get_remove_policy, False)
926
        self.assertEqual(local_provider.get_remove_policy, True)
927
        self.assertEqual(local_provider.get_login_policy, False)
928

    
929
        cl_olduser.logout()
930
        login_data = {'username': 'olduser@grnet.gr', 'password': 'password'}
931
        r = cl_olduser.post('/im/local', login_data, follow=True)
932
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
933

    
934

    
935
class TestAuthProvidersAPI(TestCase):
936
    """
937
    Test auth_providers module API
938
    """
939

    
940
    @im_settings(IM_MODULES=['local', 'shibboleth'])
941
    def test_create(self):
942
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
943
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
944

    
945
        module = 'shibboleth'
946
        identifier = 'SHIB_UUID'
947
        provider_params = {
948
            'affiliation': 'UNIVERSITY',
949
            'info': {'age': 27}
950
        }
951
        provider = auth.get_provider(module, user2, identifier,
952
                                     **provider_params)
953
        provider.add_to_user()
954
        provider = auth.get_provider(module, user, identifier,
955
                                     **provider_params)
956
        provider.add_to_user()
957
        user.email_verified = True
958
        user.save()
959
        self.assertRaises(Exception, provider.add_to_user)
960
        provider = user.get_auth_provider(module, identifier)
961
        self.assertEqual(user.get_auth_provider(
962
            module, identifier)._instance.info.get('age'), 27)
963

    
964
        module = 'local'
965
        identifier = None
966
        provider_params = {'auth_backend': 'ldap', 'info':
967
                          {'office': 'A1'}}
968
        provider = auth.get_provider(module, user, identifier,
969
                                     **provider_params)
970
        provider.add_to_user()
971
        self.assertFalse(provider.get_add_policy)
972
        self.assertRaises(Exception, provider.add_to_user)
973

    
974
        shib = user.get_auth_provider('shibboleth',
975
                                      'SHIB_UUID')
976
        self.assertTrue(shib.get_remove_policy)
977

    
978
        local = user.get_auth_provider('local')
979
        self.assertTrue(local.get_remove_policy)
980

    
981
        local.remove_from_user()
982
        self.assertFalse(shib.get_remove_policy)
983
        self.assertRaises(Exception, shib.remove_from_user)
984

    
985
        provider = user.get_auth_providers()[0]
986
        self.assertRaises(Exception, provider.add_to_user)
987

    
988
    @im_settings(IM_MODULES=['local', 'shibboleth'])
989
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'])
990
    @shibboleth_settings(CREATION_GROUPS_POLICY=['group-create', 'group1',
991
                                                 'group2'])
992
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'])
993
    @localauth_settings(CREATION_GROUPS_POLICY=['localgroup-create',
994
                                                'group-create'])
995
    def test_add_groups(self):
996
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
997
        provider = auth.get_provider('shibboleth', user, 'test123')
998
        provider.add_to_user()
999
        user = AstakosUser.objects.get()
1000
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1001
                         sorted([u'group1', u'group2', u'group-create']))
1002

    
1003
        local = auth.get_provider('local', user)
1004
        local.add_to_user()
1005
        provider = user.get_auth_provider('shibboleth')
1006
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1007
        provider.remove_from_user()
1008
        user = AstakosUser.objects.get()
1009
        self.assertEqual(len(user.get_auth_providers()), 1)
1010
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1011
                         sorted([u'group-create', u'localgroup']))
1012

    
1013
        local = user.get_auth_provider('local')
1014
        self.assertRaises(Exception, local.remove_from_user)
1015
        provider = auth.get_provider('shibboleth', user, 'test123')
1016
        provider.add_to_user()
1017
        user = AstakosUser.objects.get()
1018
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1019
                         sorted([u'group-create', u'group1', u'group2',
1020
                                 u'localgroup']))
1021

    
1022
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1023
    def test_policies(self):
1024
        group_old, created = Group.objects.get_or_create(name='olduser')
1025

    
1026
        astakos_settings.MODERATION_ENABLED = True
1027
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1028
            ['academic-user']
1029
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1030
            ['google-user']
1031

    
1032
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
1033
        user.groups.add(group_old)
1034
        user.add_auth_provider('local')
1035

    
1036
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
1037
        user2.add_auth_provider('shibboleth', identifier='shibid')
1038

    
1039
        user3 = AstakosUser.objects.create(email="kpap3@grnet.gr")
1040
        user3.groups.add(group_old)
1041
        user3.add_auth_provider('local')
1042
        user3.add_auth_provider('shibboleth', identifier='1234')
1043

    
1044
        self.assertTrue(user2.groups.get(name='academic-user'))
1045
        self.assertFalse(user2.groups.filter(name='olduser').count())
1046

    
1047
        local = auth_providers.get_provider('local')
1048
        self.assertTrue(local.get_add_policy)
1049

    
1050
        academic_group = Group.objects.get(name='academic-user')
1051
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1052
                                                     academic_group,
1053
                                                     exclusive=True,
1054
                                                     add=False,
1055
                                                     login=False)
1056
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1057
                                                     academic_group,
1058
                                                     exclusive=True,
1059
                                                     login=False,
1060
                                                     add=False)
1061
        # no duplicate entry gets created
1062
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1063

    
1064
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1065
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1066
                                                     user2,
1067
                                                     remove=False)
1068
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1069

    
1070
        local = auth_providers.get_provider('local', user2)
1071
        google = auth_providers.get_provider('google', user2)
1072
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1073
        self.assertTrue(shibboleth.get_login_policy)
1074
        self.assertFalse(shibboleth.get_remove_policy)
1075
        self.assertFalse(local.get_add_policy)
1076
        self.assertFalse(local.get_add_policy)
1077
        self.assertFalse(google.get_add_policy)
1078

    
1079
        user2.groups.remove(Group.objects.get(name='academic-user'))
1080
        self.assertTrue(local.get_add_policy)
1081
        self.assertTrue(google.get_add_policy)
1082
        user2.groups.add(Group.objects.get(name='academic-user'))
1083

    
1084
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1085
                                                     user2,
1086
                                                     exclusive=True,
1087
                                                     add=True)
1088
        self.assertTrue(local.get_add_policy)
1089
        self.assertTrue(google.get_add_policy)
1090

    
1091
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1092
        self.assertFalse(local.get_automoderate_policy)
1093
        self.assertFalse(google.get_automoderate_policy)
1094
        self.assertTrue(shibboleth.get_automoderate_policy)
1095

    
1096
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1097
                  'GOOGLE_ADD_GROUPS_POLICY']:
1098
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1099

    
1100

    
1101
    @shibboleth_settings(CREATE_POLICY=True)
1102
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1103
    def test_create_http(self):
1104
        # this should be wrapped inside a transaction
1105
        user = AstakosUser(email="test@test.com")
1106
        user.save()
1107
        provider = auth_providers.get_provider('shibboleth', user,
1108
                                               'test@academia.test')
1109
        provider.add_to_user()
1110
        user.get_auth_provider('shibboleth', 'test@academia.test')
1111
        provider = auth_providers.get_provider('local', user)
1112
        provider.add_to_user()
1113
        user.get_auth_provider('local')
1114

    
1115
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1116
        user = AstakosUser(email="test2@test.com")
1117
        user.save()
1118
        provider = auth_providers.get_provider('shibboleth', user,
1119
                                               'test@shibboleth.com',
1120
                                               **{'info': {'name':
1121
                                                                'User Test'}})
1122
        self.assertFalse(provider.get_create_policy)
1123
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1124
        self.assertTrue(provider.get_create_policy)
1125
        academic = provider.add_to_user()
1126

    
1127
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1128
    @shibboleth_settings(LIMIT_POLICY=2)
1129
    def test_policies(self):
1130
        user = get_local_user('kpap@grnet.gr')
1131
        user.add_auth_provider('shibboleth', identifier='1234')
1132
        user.add_auth_provider('shibboleth', identifier='12345')
1133

    
1134
        # default limit is 1
1135
        local = user.get_auth_provider('local')
1136
        self.assertEqual(local.get_add_policy, False)
1137

    
1138
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1139
        academic = user.get_auth_provider('shibboleth',
1140
                                          identifier='1234')
1141
        self.assertEqual(academic.get_add_policy, False)
1142
        newacademic = auth_providers.get_provider('shibboleth', user,
1143
                                                  identifier='123456')
1144
        self.assertEqual(newacademic.get_add_policy, True)
1145
        user.add_auth_provider('shibboleth', identifier='123456')
1146
        self.assertEqual(academic.get_add_policy, False)
1147
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1148

    
1149
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1150
    @shibboleth_settings(LIMIT_POLICY=2)
1151
    def test_messages(self):
1152
        user = get_local_user('kpap@grnet.gr')
1153
        user.add_auth_provider('shibboleth', identifier='1234')
1154
        user.add_auth_provider('shibboleth', identifier='12345')
1155
        provider = auth_providers.get_provider('shibboleth')
1156
        self.assertEqual(provider.get_message('title'), 'Academic')
1157
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1158
        # regenerate messages cache
1159
        provider = auth_providers.get_provider('shibboleth')
1160
        self.assertEqual(provider.get_message('title'), 'New title')
1161
        self.assertEqual(provider.get_message('login_title'),
1162
                         'New title LOGIN')
1163
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1164
        self.assertEqual(provider.get_module_icon,
1165
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1166
        self.assertEqual(provider.get_module_medium_icon,
1167
                         settings.MEDIA_URL +
1168
                         'im/auth/icons-medium/shibboleth.png')
1169

    
1170
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1171
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1172
        self.assertEqual(provider.get_method_details_msg,
1173
                         'Account: 12345')
1174
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1175
        self.assertEqual(provider.get_method_details_msg,
1176
                         'Account: 1234')
1177

    
1178
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1179
        self.assertEqual(provider.get_not_active_msg,
1180
                         "'Academic login' is disabled.")
1181

    
1182
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1183
    @shibboleth_settings(LIMIT_POLICY=2)
1184
    def test_templates(self):
1185
        user = get_local_user('kpap@grnet.gr')
1186
        user.add_auth_provider('shibboleth', identifier='1234')
1187
        user.add_auth_provider('shibboleth', identifier='12345')
1188

    
1189
        provider = auth_providers.get_provider('shibboleth')
1190
        self.assertEqual(provider.get_template('login'),
1191
                         'im/auth/shibboleth_login.html')
1192
        provider = auth_providers.get_provider('google')
1193
        self.assertEqual(provider.get_template('login'),
1194
                         'im/auth/generic_login.html')
1195

    
1196

    
1197
class TestProjects(TestCase):
1198
    """
1199
    Test projects.
1200
    """
1201
    def setUp(self):
1202
        # astakos resources
1203
        self.astakos_service = Service.objects.create(name="astakos",
1204
                                                      api_url="/astakos/api/")
1205
        self.resource = Resource.objects.create(name="astakos.pending_app",
1206
                                                uplimit=0,
1207
                                                allow_in_projects=False,
1208
                                                service=self.astakos_service)
1209

    
1210
        # custom service resources
1211
        self.service = Service.objects.create(name="service1",
1212
                                              api_url="http://service.api")
1213
        self.resource = Resource.objects.create(name="service1.resource",
1214
                                                uplimit=100,
1215
                                                service=self.service)
1216
        self.admin = get_local_user("projects-admin@synnefo.org")
1217
        self.admin.uuid = 'uuid1'
1218
        self.admin.save()
1219

    
1220
        self.user = get_local_user("user@synnefo.org")
1221
        self.member = get_local_user("member@synnefo.org")
1222
        self.member2 = get_local_user("member2@synnefo.org")
1223

    
1224
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1225
        self.user_client = get_user_client("user@synnefo.org")
1226
        self.member_client = get_user_client("member@synnefo.org")
1227
        self.member2_client = get_user_client("member2@synnefo.org")
1228

    
1229
        quotas.qh_sync_users(AstakosUser.objects.all())
1230

    
1231
    @im_settings(PROJECT_ADMINS=['uuid1'])
1232
    def test_application_limit(self):
1233
        # user cannot create a project
1234
        r = self.user_client.get(reverse('project_add'), follow=True)
1235
        self.assertRedirects(r, reverse('project_list'))
1236
        self.assertContains(r, "You are not allowed to create a new project")
1237

    
1238
        # but admin can
1239
        r = self.admin_client.get(reverse('project_add'), follow=True)
1240
        self.assertRedirects(r, reverse('project_add'))
1241

    
1242
    @im_settings(PROJECT_ADMINS=['uuid1'])
1243
    def test_allow_in_project(self):
1244
        dfrom = datetime.now()
1245
        dto = datetime.now() + timedelta(days=30)
1246

    
1247
        # astakos.pending_uplimit allow_in_project flag is False
1248
        # we shouldn't be able to create a project application using this
1249
        # resource.
1250
        application_data = {
1251
            'name': 'project.synnefo.org',
1252
            'homepage': 'https://www.synnefo.org',
1253
            'start_date': dfrom.strftime("%Y-%m-%d"),
1254
            'end_date': dto.strftime("%Y-%m-%d"),
1255
            'member_join_policy': 2,
1256
            'member_leave_policy': 1,
1257
            'service1.resource_uplimit': 100,
1258
            'is_selected_service1.resource': "1",
1259
            'astakos.pending_app_uplimit': 100,
1260
            'is_selected_accounts': "1",
1261
            'user': self.user.pk
1262
        }
1263
        form = forms.ProjectApplicationForm(data=application_data)
1264
        # form is invalid
1265
        self.assertEqual(form.is_valid(), False)
1266

    
1267
        del application_data['astakos.pending_app_uplimit']
1268
        del application_data['is_selected_accounts']
1269
        form = forms.ProjectApplicationForm(data=application_data)
1270
        self.assertEqual(form.is_valid(), True)
1271

    
1272
    @im_settings(PROJECT_ADMINS=['uuid1'])
1273
    def test_applications(self):
1274
        # let user have 2 pending applications
1275
        self.user.add_resource_policy('astakos.pending_app', 2)
1276
        quotas.qh_sync_users(AstakosUser.objects.all())
1277

    
1278
        r = self.user_client.get(reverse('project_add'), follow=True)
1279
        self.assertRedirects(r, reverse('project_add'))
1280

    
1281
        # user fills the project application form
1282
        post_url = reverse('project_add') + '?verify=1'
1283
        dfrom = datetime.now()
1284
        dto = datetime.now() + timedelta(days=30)
1285
        application_data = {
1286
            'name': 'project.synnefo.org',
1287
            'homepage': 'https://www.synnefo.org',
1288
            'start_date': dfrom.strftime("%Y-%m-%d"),
1289
            'end_date': dto.strftime("%Y-%m-%d"),
1290
            'member_join_policy': 2,
1291
            'member_leave_policy': 1,
1292
            'service1.resource_uplimit': 100,
1293
            'is_selected_service1.resource': "1",
1294
            'user': self.user.pk
1295
        }
1296
        r = self.user_client.post(post_url, data=application_data, follow=True)
1297
        self.assertEqual(r.status_code, 200)
1298
        self.assertEqual(r.context['form'].is_valid(), True)
1299

    
1300
        # confirm request
1301
        post_url = reverse('project_add') + '?verify=0&edit=0'
1302
        r = self.user_client.post(post_url, data=application_data, follow=True)
1303
        self.assertContains(r, "The project application has been received")
1304
        self.assertRedirects(r, reverse('project_list'))
1305
        self.assertEqual(ProjectApplication.objects.count(), 1)
1306
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1307

    
1308
        # create another one
1309
        application_data['name'] = 'project2.synnefo.org'
1310
        r = self.user_client.post(post_url, data=application_data, follow=True)
1311
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1312

    
1313
        # no more applications (LIMIT is 2)
1314
        r = self.user_client.get(reverse('project_add'), follow=True)
1315
        self.assertRedirects(r, reverse('project_list'))
1316
        self.assertContains(r, "You are not allowed to create a new project")
1317

    
1318
        # login
1319
        self.admin_client.get(reverse("edit_profile"))
1320
        # admin approves
1321
        r = self.admin_client.post(reverse('project_app_approve',
1322
                                           kwargs={'application_id': app1_id}),
1323
                                   follow=True)
1324
        self.assertEqual(r.status_code, 200)
1325

    
1326
        # project created
1327
        self.assertEqual(Project.objects.count(), 1)
1328

    
1329
        # login
1330
        self.member_client.get(reverse("edit_profile"))
1331
        # cannot join app2 (not approved yet)
1332
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1333
        r = self.member_client.post(join_url, follow=True)
1334
        self.assertEqual(r.status_code, 403)
1335

    
1336
        # can join app1
1337
        self.member_client.get(reverse("edit_profile"))
1338
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1339
        r = self.member_client.post(join_url, follow=True)
1340
        self.assertEqual(r.status_code, 200)
1341

    
1342
        self.assertEqual(ProjectMembership.objects.count(), 1)
1343

    
1344
        reject_member_url = reverse('project_reject_member',
1345
                                    kwargs={'chain_id': app1_id, 'memb_id':
1346
                                            self.member.pk})
1347
        accept_member_url = reverse('project_accept_member',
1348
                                    kwargs={'chain_id': app1_id, 'memb_id':
1349
                                            self.member.pk})
1350

    
1351
        # only project owner is allowed to reject
1352
        r = self.member_client.post(reject_member_url, follow=True)
1353
        self.assertContains(r, "You do not have the permissions")
1354
        self.assertEqual(r.status_code, 200)
1355

    
1356
        # user (owns project) rejects membership
1357
        r = self.user_client.post(reject_member_url, follow=True)
1358
        self.assertEqual(ProjectMembership.objects.count(), 0)
1359

    
1360
        # user rejoins
1361
        self.member_client.get(reverse("edit_profile"))
1362
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1363
        r = self.member_client.post(join_url, follow=True)
1364
        self.assertEqual(r.status_code, 200)
1365
        self.assertEqual(ProjectMembership.objects.count(), 1)
1366

    
1367
        # user (owns project) accepts membership
1368
        r = self.user_client.post(accept_member_url, follow=True)
1369
        self.assertEqual(ProjectMembership.objects.count(), 1)
1370
        membership = ProjectMembership.objects.get()
1371
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1372

    
1373
        user_quotas = quotas.get_users_quotas([self.member])
1374
        resource = 'service1.resource'
1375
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1376
        # 100 from initial uplimit + 100 from project
1377
        self.assertEqual(newlimit, 200)
1378

    
1379
        remove_member_url = reverse('project_remove_member',
1380
                                    kwargs={'chain_id': app1_id, 'memb_id':
1381
                                            self.member.pk})
1382
        r = self.user_client.post(remove_member_url, follow=True)
1383
        self.assertEqual(r.status_code, 200)
1384

    
1385
        user_quotas = quotas.get_users_quotas([self.member])
1386
        resource = 'service1.resource'
1387
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1388
        # 200 - 100 from project
1389
        self.assertEqual(newlimit, 100)