Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ 8ab484ea

History | View | Annotate | Download (20.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import datetime
35

    
36
from django.test import TestCase, Client
37
from django.conf import settings
38
from django.core import mail
39

    
40
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
41
from astakos.im.models import *
42
from astakos.im import functions
43
from astakos.im import settings as astakos_settings
44

    
45
from urllib import quote
46

    
47
from astakos.im import messages
48

    
49
class ShibbolethClient(Client):
50
    """
51
    A shibboleth agnostic client.
52
    """
53
    VALID_TOKENS = filter(lambda x: not x.startswith("_"), dir(ShibbolethTokens))
54

    
55
    def __init__(self, *args, **kwargs):
56
        self.tokens = kwargs.pop('tokens', {})
57
        super(ShibbolethClient, self).__init__(*args, **kwargs)
58

    
59
    def set_tokens(self, **kwargs):
60
        for key, value in kwargs.iteritems():
61
            key = 'SHIB_%s' % key.upper()
62
            if not key in self.VALID_TOKENS:
63
                raise Exception('Invalid shibboleth token')
64

    
65
            self.tokens[key] = value
66

    
67
    def unset_tokens(self, *keys):
68
        for key in keys:
69
            key = 'SHIB_%s' % param.upper()
70
            if key in self.tokens:
71
                del self.tokens[key]
72

    
73
    def reset_tokens(self):
74
        self.tokens = {}
75

    
76
    def get_http_token(self, key):
77
        http_header = getattr(ShibbolethTokens, key)
78
        return http_header
79

    
80
    def request(self, **request):
81
        """
82
        Transform valid shibboleth tokens to http headers
83
        """
84
        for token, value in self.tokens.iteritems():
85
            request[self.get_http_token(token)] = value
86

    
87
        for param in request.keys():
88
            key = 'SHIB_%s' % param.upper()
89
            if key in self.VALID_TOKENS:
90
                request[self.get_http_token(key)] = request[param]
91
                del request[param]
92

    
93
        return super(ShibbolethClient, self).request(**request)
94

    
95

    
96
def get_local_user(username, **kwargs):
97
        try:
98
            return AstakosUser.objects.get(email=username)
99
        except:
100
            user_params = {
101
                'username': username,
102
                'email': username,
103
                'is_active': True,
104
                'activation_sent': datetime.now(),
105
                'email_verified': True,
106
                'provider': 'local'
107
            }
108
            user_params.update(kwargs)
109
            user = AstakosUser(**user_params)
110
            user.set_password(kwargs.get('password', 'password'))
111
            user.save()
112
            user.add_auth_provider('local', auth_backend='astakos')
113
            if kwargs.get('is_active', True):
114
                user.is_active = True
115
            else:
116
                user.is_active = False
117
            user.save()
118
            return user
119

    
120

    
121
def get_mailbox(email):
122
    mails = []
123
    for sent_email in mail.outbox:
124
        for recipient in sent_email.recipients():
125
            if email in recipient:
126
                mails.append(sent_email)
127
    return mails
128

    
129

    
130
class ShibbolethTests(TestCase):
131
    """
132
    Testing shibboleth authentication.
133
    """
134

    
135
    fixtures = ['groups']
136

    
137
    def setUp(self):
138
        self.client = ShibbolethClient()
139
        settings.ASTAKOS_IM_MODULES = ['local', 'shibboleth']
140

    
141
    def test_create_account(self):
142
        client = ShibbolethClient()
143

    
144
        # shibboleth views validation
145
        # eepn required
146
        r = client.get('/im/login/shibboleth?', follow=True)
147
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN)
148
        client.set_tokens(eppn="kpapeppn")
149
        # shibboleth user info required
150
        r = client.get('/im/login/shibboleth?', follow=True)
151
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
152

    
153
        # shibboleth logged us in
154
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
155
        r = client.get('/im/login/shibboleth?')
156

    
157
        # astakos asks if we want to add shibboleth
158
        self.assertContains(r, "Already have an account?")
159

    
160
        # a new pending user created
161
        pending_user = PendingThirdPartyUser.objects.get(
162
            third_party_identifier="kpapeppn")
163
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
164
        token = pending_user.token
165
        # from now on no shibboleth headers are sent to the server
166
        client.reset_tokens()
167

    
168
        # we choose to signup as a new user
169
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
170
        self.assertEqual(r.status_code, 404)
171

    
172
        r = client.get('/im/shibboleth/signup/%s' % token)
173
        form = r.context['form']
174
        post_data = {'email': 'kpap',
175
                     'third_party_identifier': pending_user.third_party_identifier,
176
                     'first_name': 'Kostas',
177
                     'third_party_token': token,
178
                     'last_name': 'Mitroglou',
179
                     'additional_email': 'kpap@grnet.gr',
180
                     'provider': 'shibboleth'
181
                    }
182
        r = client.post('/im/signup', post_data)
183
        self.assertContains(r, token)
184
        post_data['email'] = 'kpap@grnet.gr'
185
        r = client.post('/im/signup', post_data)
186
        self.assertEqual(r.status_code, 200)
187
        self.assertEqual(AstakosUser.objects.count(), 1)
188
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
189
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
190

    
191

    
192
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn", cn="1", )
193
        r = client.get("/im/login/shibboleth?", follow=True)
194
        self.assertContains(r, "Your request is pending activation")
195
        r = client.get("/im/profile", follow=True)
196
        self.assertRedirects(r, 'http://testserver/im/?next=%2Fim%2Fprofile')
197

    
198
        u = AstakosUser.objects.get()
199
        functions.activate(u)
200
        self.assertEqual(u.is_active, True)
201

    
202
        r = client.get("/im/login/shibboleth?")
203
        self.assertRedirects(r, '/im/profile')
204

    
205
    def test_existing(self):
206
        existing_user = get_local_user('kpap@grnet.gr')
207

    
208
        client = ShibbolethClient()
209
        # shibboleth logged us in, notice that we use different email
210
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1", )
211
        r = client.get("/im/login/shibboleth?")
212
        # astakos asks if we want to switch a local account to shibboleth
213
        self.assertContains(r, "Already have an account?")
214

    
215
        # a new pending user created
216
        pending_user = PendingThirdPartyUser.objects.get()
217
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
218
        pending_key = pending_user.token
219
        client.reset_tokens()
220

    
221
        # we choose to add shibboleth to an our existing account
222
        # we get redirected to login page with the pending token set
223
        r = client.get('/im/login?key=%s' % pending_key)
224
        post_data = {'password': 'password',
225
                     'username': 'kpap@grnet.gr',
226
                     'key': pending_key}
227
        r = client.post('/im/local', post_data, follow=True)
228
        self.assertContains(r, "Your new login method has been added")
229

    
230
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
231
                                       email="kpap@grnet.gr")
232
        self.assertTrue(user.has_auth_provider('shibboleth'))
233
        self.assertTrue(user.has_auth_provider('local', auth_backend='astakos'))
234
        client.logout()
235

    
236
        # again ???? show her a message
237
        r = client.get('/im/login?key=%s' % pending_key)
238
        post_data = {'password': 'password',
239
                     'username': 'kpap@grnet.gr',
240
                     'key': pending_key}
241
        r = self.client.post('/im/local', post_data, follow=True)
242
        self.assertContains(r, "Account method assignment failed")
243
        self.client.logout()
244
        client.logout()
245

    
246
        # look Ma, i can login with both my shibboleth and local account
247
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
248
        r = client.get("/im/login/shibboleth?", follow=True)
249
        self.assertTrue(r.context['request'].user.is_authenticated())
250
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
251
        r = client.get("/im/profile")
252
        self.assertEquals(r.status_code,200)
253
        client.logout()
254
        client.reset_tokens()
255
        r = client.get("/im/profile", follow=True)
256
        self.assertFalse(r.context['request'].user.is_authenticated())
257

    
258
        post_data = {'password': 'password',
259
                     'username': 'kpap@grnet.gr'}
260
        r = self.client.post('/im/local', post_data, follow=True)
261
        self.assertTrue(r.context['request'].user.is_authenticated())
262
        r = self.client.get("/im/profile")
263
        self.assertEquals(r.status_code,200)
264

    
265
        r = client.post('/im/local', post_data, follow=True)
266
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn", cn="1", )
267
        r = client.get("/im/login/shibboleth?", follow=True)
268
        client.reset_tokens()
269

    
270
        client.logout()
271
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid", cn="1")
272
        r = client.get("/im/login/shibboleth?", follow=True)
273
        self.assertFalse(r.context['request'].user.is_authenticated())
274

    
275
        # lets remove local password
276
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
277
                                       email="kpap@grnet.gr")
278
        provider_pk = user.auth_providers.get(module='local').pk
279
        provider_shib_pk = user.auth_providers.get(module='shibboleth').pk
280
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
281
        r = client.get("/im/login/shibboleth?", follow=True)
282
        client.reset_tokens()
283
        r = client.get("/im/remove_auth_provider/%d" % provider_pk)
284
        self.assertEqual(user.auth_providers.count(), 1)
285
        r = client.get("/im/remove_auth_provider/%d" % provider_pk)
286
        self.assertEqual(r.status_code, 404)
287
        r = client.get("/im/remove_auth_provider/%d" % provider_shib_pk)
288
        self.assertEqual(r.status_code, 403)
289

    
290
        self.client.logout()
291
        post_data = {'password': 'password',
292
                     'username': 'kpap@grnet.gr'}
293
        r = self.client.post('/im/local', post_data, follow=True)
294
        self.assertFalse(r.context['request'].user.is_authenticated())
295

    
296
        r = client.get("/im/password_change", follow=True)
297
        r = client.post("/im/password_change", {'new_password1':'111',
298
                                                'new_password2': '111'},
299
                        follow=True)
300
        user = r.context['request'].user
301
        self.assertTrue(user.has_auth_provider('local'))
302
        self.assertTrue(user.has_auth_provider('shibboleth'))
303
        self.assertTrue(user.check_password('111'))
304
        self.assertTrue(user.has_usable_password())
305
        self.client.logout()
306
        post_data = {'password': '111',
307
                     'username': 'kpap@grnet.gr'}
308
        r = self.client.post('/im/local', post_data, follow=True)
309
        self.assertTrue(r.context['request'].user.is_authenticated())
310

    
311
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn", cn="1")
312
        r = client.get("/im/login/shibboleth?", follow=True)
313
        r = client.get("/im/login/shibboleth?", follow=True)
314
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
315
                                       email="kpap@grnet.gr")
316

    
317
        user2 = get_local_user('another@grnet.gr')
318
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
319

    
320
        self.assertEqual(user.auth_providers.count(), 2) # local and 1 shibboleth
321
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="kpapeppn2", cn="1")
322
        r = client.get("/im/login/shibboleth?", follow=True)
323
        self.assertEqual(user.auth_providers.count(), 3) # local and 2 shibboleth
324

    
325
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="kpapeppn2", cn="1")
326
        r = client.get("/im/login/shibboleth?", follow=True)
327

    
328
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="existingeppn", cn="1")
329
        r = client.get("/im/login/shibboleth?", follow=True)
330
        self.assertContains(r, 'Account already exists')
331

    
332

    
333
class LocalUserTests(TestCase):
334

    
335
    fixtures = ['groups']
336

    
337
    def setUp(self):
338
        from django.conf import settings
339
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
340
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
341

    
342
    def test_invitations(self):
343
        return
344

    
345
    def test_local_provider(self):
346
        r = self.client.get("/im/signup")
347
        self.assertEqual(r.status_code, 200)
348

    
349
        data = {'email':'kpap@grnet.gr', 'password1':'password',
350
                'password2':'password', 'first_name': 'Kostas',
351
                'last_name': 'Mitroglou', 'provider': 'local'}
352
        r = self.client.post("/im/signup", data)
353
        self.assertEqual(AstakosUser.objects.count(), 1)
354
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
355
                                       email="kpap@grnet.gr")
356
        self.assertEqual(user.username, 'kpap@grnet.gr')
357
        self.assertEqual(user.has_auth_provider('local'), True)
358
        self.assertFalse(user.is_active)
359

    
360
        # admin gets notified
361
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 1)
362
        # and sends user activation email
363
        functions.send_activation(user)
364

    
365
        # user activation fields updated
366
        user = AstakosUser.objects.get(pk=user.pk)
367
        self.assertTrue(user.activation_sent)
368
        self.assertFalse(user.email_verified)
369
        # email sent to user
370
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
371

    
372
        # user forgot she got registered and tries to submit registration
373
        # form. Notice the upper case in email
374
        data = {'email':'KPAP@grnet.gr', 'password1':'password',
375
                'password2':'password', 'first_name': 'Kostas',
376
                'last_name': 'Mitroglou', 'provider': 'local'}
377
        r = self.client.post("/im/signup", data)
378
        self.assertContains(r, messages.EMAIL_USED)
379

    
380
        # hmmm, email exists; lets get the password
381
        r = self.client.get('/im/local/password_reset')
382
        self.assertEqual(r.status_code, 200)
383
        r = self.client.post('/im/local/password_reset', {'email':
384
                                                          'kpap@grnet.gr'})
385
        # she can't because account is not active yet
386
        self.assertContains(r, "doesn't have an associated user account")
387

    
388
        # moderation is enabled so no automatic activation can be send
389
        r = self.client.get('/im/send/activation/%d' % user.pk)
390
        self.assertEqual(r.status_code, 403)
391
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
392
        # also she cannot login
393
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
394
                                                 'password': 'password'})
395
        self.assertContains(r, 'You have not followed the activation link')
396
        self.assertNotContains(r, 'Resend activation')
397
        self.assertFalse(r.context['request'].user.is_authenticated())
398
        self.assertFalse('_pithos2_a' in self.client.cookies)
399

    
400
        # lets disable moderation
401
        astakos_settings.MODERATION_ENABLED = False
402
        r = self.client.post('/im/local/password_reset', {'email':
403
                                                          'kpap@grnet.gr'})
404
        self.assertContains(r, "doesn't have an associated user account")
405
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
406
                                                 'password': 'password'})
407
        self.assertContains(r, 'You have not followed the activation link')
408
        self.assertContains(r, 'Resend activation')
409
        self.assertFalse(r.context['request'].user.is_authenticated())
410
        self.assertFalse('_pithos2_a' in self.client.cookies)
411
        # user sees the message and resends activation
412
        r = self.client.get('/im/send/activation/%d' % user.pk)
413
        # email sent
414
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 2)
415

    
416
        # switch back moderation setting
417
        astakos_settings.MODERATION_ENABLED = True
418
        # lets activate the user
419
        r = self.client.get(user.get_activation_url(), follow=True)
420
        self.assertRedirects(r, "/im/profile")
421
        self.assertContains(r, "kpap@grnet.gr")
422
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 3)
423

    
424
        user = AstakosUser.objects.get(pk=user.pk)
425
        # user activated and logged in, token cookie set
426
        self.assertTrue(r.context['request'].user.is_authenticated())
427
        self.assertTrue('_pithos2_a' in self.client.cookies)
428
        cookies = self.client.cookies
429
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
430
        r = self.client.get('/im/logout', follow=True)
431
        r = self.client.get('/im/')
432
        # user logged out, token cookie removed
433
        self.assertFalse(r.context['request'].user.is_authenticated())
434
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
435
        # https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
436
        del self.client.cookies['_pithos2_a']
437

    
438
        # user can login
439
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
440
                                           'password': 'password'},
441
                                          follow=True)
442
        self.assertTrue(r.context['request'].user.is_authenticated())
443
        self.assertTrue('_pithos2_a' in self.client.cookies)
444
        cookies = self.client.cookies
445
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
446
        self.client.get('/im/logout', follow=True)
447

    
448
        # user forgot password
449
        old_pass = user.password
450
        r = self.client.get('/im/local/password_reset')
451
        self.assertEqual(r.status_code, 200)
452
        r = self.client.post('/im/local/password_reset', {'email':
453
                                                          'kpap@grnet.gr'})
454
        self.assertEqual(r.status_code, 302)
455
        # email sent
456
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 4)
457

    
458
        # user visits change password link
459
        r = self.client.get(user.get_password_reset_url())
460
        r = self.client.post(user.get_password_reset_url(),
461
                            {'new_password1':'newpass',
462
                             'new_password2':'newpass'})
463

    
464
        user = AstakosUser.objects.get(pk=user.pk)
465
        self.assertNotEqual(old_pass, user.password)
466

    
467
        # old pass is not usable
468
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
469
                                           'password': 'password'})
470
        self.assertContains(r, 'Please enter a correct username and password')
471
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
472
                                           'password': 'newpass'},
473
                                           follow=True)
474
        self.assertTrue(r.context['request'].user.is_authenticated())
475
        self.client.logout()
476

    
477
        # tests of special local backends
478
        user = AstakosUser.objects.get(pk=user.pk)
479
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
480
        user.save()
481

    
482
        # non astakos local backends do not support password reset
483
        r = self.client.get('/im/local/password_reset')
484
        self.assertEqual(r.status_code, 200)
485
        r = self.client.post('/im/local/password_reset', {'email':
486
                                                          'kpap@grnet.gr'})
487
        # she can't because account is not active yet
488
        self.assertContains(r, "Password change for this account is not"
489
                                " supported")
490