Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ 63836eda

History | View | Annotate | Download (29.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import datetime
35

    
36
from django.test import TestCase, Client
37
from django.conf import settings
38
from django.core import mail
39

    
40
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
41
from astakos.im.models import *
42
from astakos.im import functions
43
from astakos.im import settings as astakos_settings
44
from astakos.im import forms
45

    
46
from urllib import quote
47

    
48
from astakos.im import messages
49

    
50

    
51
astakos_settings.EMAILCHANGE_ENABLED = True
52

    
53
class ShibbolethClient(Client):
54
    """
55
    A shibboleth agnostic client.
56
    """
57
    VALID_TOKENS = filter(lambda x: not x.startswith("_"), dir(ShibbolethTokens))
58

    
59
    def __init__(self, *args, **kwargs):
60
        self.tokens = kwargs.pop('tokens', {})
61
        super(ShibbolethClient, self).__init__(*args, **kwargs)
62

    
63
    def set_tokens(self, **kwargs):
64
        for key, value in kwargs.iteritems():
65
            key = 'SHIB_%s' % key.upper()
66
            if not key in self.VALID_TOKENS:
67
                raise Exception('Invalid shibboleth token')
68

    
69
            self.tokens[key] = value
70

    
71
    def unset_tokens(self, *keys):
72
        for key in keys:
73
            key = 'SHIB_%s' % param.upper()
74
            if key in self.tokens:
75
                del self.tokens[key]
76

    
77
    def reset_tokens(self):
78
        self.tokens = {}
79

    
80
    def get_http_token(self, key):
81
        http_header = getattr(ShibbolethTokens, key)
82
        return http_header
83

    
84
    def request(self, **request):
85
        """
86
        Transform valid shibboleth tokens to http headers
87
        """
88
        for token, value in self.tokens.iteritems():
89
            request[self.get_http_token(token)] = value
90

    
91
        for param in request.keys():
92
            key = 'SHIB_%s' % param.upper()
93
            if key in self.VALID_TOKENS:
94
                request[self.get_http_token(key)] = request[param]
95
                del request[param]
96

    
97
        return super(ShibbolethClient, self).request(**request)
98

    
99

    
100
def get_local_user(username, **kwargs):
101
        try:
102
            return AstakosUser.objects.get(email=username)
103
        except:
104
            user_params = {
105
                'username': username,
106
                'email': username,
107
                'is_active': True,
108
                'activation_sent': datetime.now(),
109
                'email_verified': True,
110
                'provider': 'local'
111
            }
112
            user_params.update(kwargs)
113
            user = AstakosUser(**user_params)
114
            user.set_password(kwargs.get('password', 'password'))
115
            user.save()
116
            user.add_auth_provider('local', auth_backend='astakos')
117
            if kwargs.get('is_active', True):
118
                user.is_active = True
119
            else:
120
                user.is_active = False
121
            user.save()
122
            return user
123

    
124

    
125
def get_mailbox(email):
126
    mails = []
127
    for sent_email in mail.outbox:
128
        for recipient in sent_email.recipients():
129
            if email in recipient:
130
                mails.append(sent_email)
131
    return mails
132

    
133

    
134
class ShibbolethTests(TestCase):
135
    """
136
    Testing shibboleth authentication.
137
    """
138

    
139
    fixtures = ['groups']
140

    
141
    def setUp(self):
142
        kind = GroupKind.objects.create(name="default")
143
        AstakosGroup.objects.create(name="default", kind=kind)
144
        self.client = ShibbolethClient()
145
        settings.ASTAKOS_IM_MODULES = ['local', 'shibboleth']
146
        settings.ASTAKOS_MODERATION_ENABLED = True
147

    
148
    def test_create_account(self):
149

    
150
        client = ShibbolethClient()
151

    
152
        # shibboleth views validation
153
        # eepn required
154
        r = client.get('/im/login/shibboleth?', follow=True)
155
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN)
156
        client.set_tokens(eppn="kpapeppn")
157

    
158
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
159
        # shibboleth user info required
160
        r = client.get('/im/login/shibboleth?', follow=True)
161
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
162
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
163

    
164
        # shibboleth logged us in
165
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
166
                          cn="Kostas Papadimitriou",
167
                          ep_affiliation="Test Affiliation")
168
        r = client.get('/im/login/shibboleth?')
169
        self.assertEqual(r.status_code, 200)
170

    
171
        # astakos asks if we want to add shibboleth
172
        self.assertContains(r, "Already have an account?")
173

    
174
        # a new pending user created
175
        pending_user = PendingThirdPartyUser.objects.get(
176
            third_party_identifier="kpapeppn")
177
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
178
        # keep the token for future use
179
        token = pending_user.token
180
        # from now on no shibboleth headers are sent to the server
181
        client.reset_tokens()
182

    
183
        # this is the old way, it should fail, to avoid pending user take over
184
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
185
        self.assertEqual(r.status_code, 404)
186

    
187
        # this is the signup unique url associated with the pending user created
188
        r = client.get('/im/signup/?third_party_token=%s' % token)
189
        form = r.context['form']
190
        post_data = {'third_party_identifier': pending_user.third_party_identifier,
191
                     'first_name': 'Kostas',
192
                     'third_party_token': token,
193
                     'last_name': 'Mitroglou',
194
                     'provider': 'shibboleth'
195
                    }
196

    
197
        # invlid email
198
        post_data['email'] = 'kpap'
199
        r = client.post('/im/signup', post_data)
200
        self.assertContains(r, token)
201

    
202
        # existing email
203
        existing_user = get_local_user('test@test.com')
204
        post_data['email'] = 'test@test.com'
205
        r = client.post('/im/signup', post_data)
206
        self.assertContains(r, messages.EMAIL_USED)
207
        existing_user.delete()
208

    
209
        # and finally a valid signup
210
        post_data['email'] = 'kpap@grnet.gr'
211
        r = client.post('/im/signup', post_data, follow=True)
212
        self.assertContains(r, messages.NOTIFICATION_SENT)
213

    
214
        # everything is ok in our db
215
        self.assertEqual(AstakosUser.objects.count(), 1)
216
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
217
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
218

    
219
        # provider info stored
220
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
221
        self.assertEqual(provider.affiliation, 'Test Affiliation')
222
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
223
                                         u'eppn': u'kpapeppn',
224
                                         u'name': u'Kostas Papadimitriou'})
225

    
226
        # lets login (not activated yet)
227
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
228
                          cn="Kostas Papadimitriou", )
229
        r = client.get("/im/login/shibboleth?", follow=True)
230
        self.assertContains(r, messages.ACCOUNT_PENDING_MODERATION)
231
        r = client.get("/im/profile", follow=True)
232
        self.assertRedirects(r, 'http://testserver/im/?next=%2Fim%2Fprofile')
233

    
234
        # admin activates our user
235
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
236
        functions.activate(u)
237
        self.assertEqual(u.is_active, True)
238

    
239
        # we see our profile
240
        r = client.get("/im/login/shibboleth?", follow=True)
241
        self.assertRedirects(r, '/im/profile')
242
        self.assertEqual(r.status_code, 200)
243

    
244
    def test_existing(self):
245
        """
246
        Test adding of third party login to an existing account
247
        """
248

    
249
        # this is our existing user
250
        existing_user = get_local_user('kpap@grnet.gr')
251

    
252
        client = ShibbolethClient()
253
        # shibboleth logged us in, notice that we use different email
254
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
255
                          cn="Kostas Papadimitriou", )
256
        r = client.get("/im/login/shibboleth?")
257

    
258
        # astakos asks if we want to switch a local account to shibboleth
259
        self.assertContains(r, "Already have an account?")
260

    
261
        # a new pending user created
262
        pending_user = PendingThirdPartyUser.objects.get()
263
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
264
        pending_key = pending_user.token
265
        client.reset_tokens()
266

    
267
        # we choose to add shibboleth to an our existing account
268
        # we get redirected to login page with the pending token set
269
        r = client.get('/im/login?key=%s' % pending_key)
270
        post_data = {'password': 'password',
271
                     'username': 'kpap@grnet.gr',
272
                     'key': pending_key}
273
        r = client.post('/im/local', post_data, follow=True)
274
        self.assertRedirects(r, "/im/profile")
275
        self.assertContains(r, messages.AUTH_PROVIDER_ADDED)
276

    
277
        self.assertTrue(existing_user.has_auth_provider('shibboleth'))
278
        self.assertTrue(existing_user.has_auth_provider('local',
279
                                                        auth_backend='astakos'))
280
        client.logout()
281

    
282
        # check that we cannot assign same third party provide twice
283
        r = client.get('/im/login?key=%s' % pending_key)
284
        post_data = {'password': 'password',
285
                     'username': 'kpap@grnet.gr',
286
                     'key': pending_key}
287
        r = self.client.post('/im/local', post_data, follow=True)
288
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_FAILED)
289
        self.client.logout()
290
        client.logout()
291

    
292
        # look Ma, i can login with both my shibboleth and local account
293
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
294
                          cn="Kostas Papadimitriou")
295
        r = client.get("/im/login/shibboleth?", follow=True)
296
        self.assertTrue(r.context['request'].user.is_authenticated())
297
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
298
        self.assertRedirects(r, '/im/profile')
299
        self.assertEqual(r.status_code, 200)
300
        client.logout()
301
        client.reset_tokens()
302

    
303
        # logged out
304
        r = client.get("/im/profile", follow=True)
305
        self.assertFalse(r.context['request'].user.is_authenticated())
306

    
307
        # login with local account also works
308
        post_data = {'password': 'password',
309
                     'username': 'kpap@grnet.gr'}
310
        r = self.client.post('/im/local', post_data, follow=True)
311
        self.assertTrue(r.context['request'].user.is_authenticated())
312
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
313
        self.assertRedirects(r, '/im/profile')
314
        self.assertEqual(r.status_code, 200)
315

    
316
        # cannot add the same eppn
317
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
318
                          cn="Kostas Papadimitriou", )
319
        r = client.get("/im/login/shibboleth?", follow=True)
320
        self.assertRedirects(r, '/im/profile')
321
        self.assertTrue(r.status_code, 200)
322
        self.assertEquals(existing_user.auth_providers.count(), 2)
323

    
324
        # but can add additional eppn
325
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
326
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
327
        r = client.get("/im/login/shibboleth?", follow=True)
328
        new_provider = existing_user.auth_providers.get(identifier="kpapeppn2")
329
        self.assertRedirects(r, '/im/profile')
330
        self.assertTrue(r.status_code, 200)
331
        self.assertEquals(existing_user.auth_providers.count(), 3)
332
        self.assertEqual(new_provider.affiliation, 'affil2')
333
        client.logout()
334
        client.reset_tokens()
335

    
336
        # cannot login with another eppn
337
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
338
                          cn="Kostas Papadimitriou")
339
        r = client.get("/im/login/shibboleth?", follow=True)
340
        self.assertFalse(r.context['request'].user.is_authenticated())
341

    
342
        # lets remove local password
343
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
344
                                       email="kpap@grnet.gr")
345
        remove_local_url = user.get_provider_remove_url('local')
346
        remove_shibbo_url = user.get_provider_remove_url('shibboleth',
347
                                                         identifier='kpapeppn')
348
        remove_shibbo2_url = user.get_provider_remove_url('shibboleth',
349
                                                         identifier='kpapeppn2')
350
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
351
                          cn="Kostas Papadimtriou")
352
        r = client.get("/im/login/shibboleth?", follow=True)
353
        client.reset_tokens()
354

    
355
        # TODO: this view should use POST
356
        r = client.get(remove_local_url)
357
        # 2 providers left
358
        self.assertEqual(user.auth_providers.count(), 2)
359
        r = client.get(remove_shibbo2_url)
360
        # 1 provider left
361
        self.assertEqual(user.auth_providers.count(), 1)
362
        # cannot remove last provider
363
        r = client.get(remove_shibbo_url)
364
        self.assertEqual(r.status_code, 403)
365
        self.client.logout()
366

    
367
        # cannot login using local credentials (notice we use another client)
368
        post_data = {'password': 'password',
369
                     'username': 'kpap@grnet.gr'}
370
        r = self.client.post('/im/local', post_data, follow=True)
371
        self.assertFalse(r.context['request'].user.is_authenticated())
372

    
373
        # we can reenable the local provider by setting a password
374
        r = client.get("/im/password_change", follow=True)
375
        r = client.post("/im/password_change", {'new_password1':'111',
376
                                                'new_password2': '111'},
377
                        follow=True)
378
        user = r.context['request'].user
379
        self.assertTrue(user.has_auth_provider('local'))
380
        self.assertTrue(user.has_auth_provider('shibboleth'))
381
        self.assertTrue(user.check_password('111'))
382
        self.assertTrue(user.has_usable_password())
383
        self.client.logout()
384

    
385
        # now we can login
386
        post_data = {'password': '111',
387
                     'username': 'kpap@grnet.gr'}
388
        r = self.client.post('/im/local', post_data, follow=True)
389
        self.assertTrue(r.context['request'].user.is_authenticated())
390

    
391
        client.reset_tokens()
392

    
393
        # we cannot take over another shibboleth identifier
394
        user2 = get_local_user('another@grnet.gr')
395
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
396
        # login
397
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
398
                          cn="Kostas Papadimitriou")
399
        r = client.get("/im/login/shibboleth?", follow=True)
400
        # try to assign existing shibboleth identifier of another user
401
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="existingeppn",
402
                          cn="Kostas Papadimitriou")
403
        r = client.get("/im/login/shibboleth?", follow=True)
404
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_FAILED)
405
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_EXISTS)
406

    
407

    
408
class LocalUserTests(TestCase):
409

    
410
    fixtures = ['groups']
411

    
412
    def setUp(self):
413
        kind = GroupKind.objects.create(name="default")
414
        AstakosGroup.objects.create(name="default", kind=kind)
415
        from django.conf import settings
416
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
417
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
418

    
419
    def test_no_moderation(self):
420
        # disable moderation
421
        astakos_settings.MODERATION_ENABLED = False
422

    
423
        # create a new user
424
        r = self.client.get("/im/signup")
425
        self.assertEqual(r.status_code, 200)
426
        data = {'email':'kpap@grnet.gr', 'password1':'password',
427
                'password2':'password', 'first_name': 'Kostas',
428
                'last_name': 'Mitroglou', 'provider': 'local'}
429
        r = self.client.post("/im/signup", data)
430

    
431
        # user created
432
        self.assertEqual(AstakosUser.objects.count(), 1)
433
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
434
                                       email="kpap@grnet.gr")
435
        self.assertEqual(user.username, 'kpap@grnet.gr')
436
        self.assertEqual(user.has_auth_provider('local'), True)
437
        self.assertFalse(user.is_active)
438

    
439
        # user (but not admin) gets notified
440
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
441
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
442
        astakos_settings.MODERATION_ENABLED = True
443

    
444
    def test_email_case(self):
445
        data = {
446
          'email': 'kPap@grnet.gr',
447
          'password1': '1234',
448
          'password2': '1234'
449
        }
450

    
451
        form = forms.LocalUserCreationForm(data)
452
        self.assertTrue(form.is_valid())
453
        user = form.save()
454
        form.store_user(user, {})
455

    
456
        u = AstakosUser.objects.get(pk=1)
457
        self.assertEqual(u.email, 'kPap@grnet.gr')
458
        self.assertEqual(u.username, 'kpap@grnet.gr')
459
        u.is_active = True
460
        u.email_verified = True
461
        u.save()
462

    
463
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
464
        login = forms.LoginForm(data=data)
465
        self.assertTrue(login.is_valid())
466

    
467
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
468
        login = forms.LoginForm(data=data)
469
        self.assertTrue(login.is_valid())
470

    
471
        data = {
472
          'email': 'kpap@grnet.gr',
473
          'password1': '1234',
474
          'password2': '1234'
475
        }
476
        form = forms.LocalUserCreationForm(data)
477
        self.assertFalse(form.is_valid())
478

    
479
    def test_local_provider(self):
480
        # enable moderation
481
        astakos_settings.MODERATION_ENABLED = True
482

    
483
        # create a user
484
        r = self.client.get("/im/signup")
485
        self.assertEqual(r.status_code, 200)
486
        data = {'email':'kpap@grnet.gr', 'password1':'password',
487
                'password2':'password', 'first_name': 'Kostas',
488
                'last_name': 'Mitroglou', 'provider': 'local'}
489
        r = self.client.post("/im/signup", data)
490

    
491
        # user created
492
        self.assertEqual(AstakosUser.objects.count(), 1)
493
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
494
                                       email="kpap@grnet.gr")
495
        self.assertEqual(user.username, 'kpap@grnet.gr')
496
        self.assertEqual(user.has_auth_provider('local'), True)
497
        self.assertFalse(user.is_active) # not activated
498
        self.assertFalse(user.email_verified) # not verified
499
        self.assertFalse(user.activation_sent) # activation automatically sent
500

    
501
        # admin gets notified and activates the user from the command line
502
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 1)
503
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
504
                                                 'password': 'password'})
505
        self.assertContains(r, messages.ACCOUNT_PENDING_MODERATION)
506
        functions.send_activation(user)
507

    
508
        # user activation fields updated and user gets notified via email
509
        user = AstakosUser.objects.get(pk=user.pk)
510
        self.assertTrue(user.activation_sent)
511
        self.assertFalse(user.email_verified)
512
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
513

    
514
        # user forgot she got registered and tries to submit registration
515
        # form. Notice the upper case in email
516
        data = {'email':'KPAP@grnet.gr', 'password1':'password',
517
                'password2':'password', 'first_name': 'Kostas',
518
                'last_name': 'Mitroglou', 'provider': 'local'}
519
        r = self.client.post("/im/signup", data)
520
        self.assertContains(r, messages.EMAIL_USED)
521

    
522
        # hmmm, email exists; lets get the password
523
        r = self.client.get('/im/local/password_reset')
524
        self.assertEqual(r.status_code, 200)
525
        r = self.client.post('/im/local/password_reset', {'email':
526
                                                          'kpap@grnet.gr'})
527
        # she can't because account is not active yet
528
        self.assertContains(r, "doesn't have an associated user account")
529

    
530
        # moderation is enabled so no automatic activation can be send
531
        r = self.client.get('/im/send/activation/%d' % user.pk)
532
        self.assertEqual(r.status_code, 403)
533
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
534

    
535
        # also she cannot login
536
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
537
                                                 'password': 'password'})
538
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION_HELP)
539
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION)
540
        self.assertNotContains(r, 'Resend activation')
541
        self.assertFalse(r.context['request'].user.is_authenticated())
542
        self.assertFalse('_pithos2_a' in self.client.cookies)
543

    
544
        # same with disabled moderation
545
        astakos_settings.MODERATION_ENABLED = False
546
        r = self.client.post('/im/local/password_reset', {'email':
547
                                                          'kpap@grnet.gr'})
548
        self.assertContains(r, "doesn't have an associated user account")
549
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
550
                                                 'password': 'password'})
551
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION)
552
        self.assertContains(r, 'Resend activation')
553
        self.assertFalse(r.context['request'].user.is_authenticated())
554
        self.assertFalse('_pithos2_a' in self.client.cookies)
555

    
556
        # user sees the message and resends activation
557
        r = self.client.get('/im/send/activation/%d' % user.pk)
558
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 2)
559

    
560
        # switch back moderation setting
561
        astakos_settings.MODERATION_ENABLED = True
562
        r = self.client.get(user.get_activation_url(), follow=True)
563
        self.assertRedirects(r, "/im/profile")
564
        self.assertContains(r, "kpap@grnet.gr")
565
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 3)
566

    
567
        user = AstakosUser.objects.get(pk=user.pk)
568
        # user activated and logged in, token cookie set
569
        self.assertTrue(r.context['request'].user.is_authenticated())
570
        self.assertTrue('_pithos2_a' in self.client.cookies)
571
        cookies = self.client.cookies
572
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
573
        r = self.client.get('/im/logout', follow=True)
574
        r = self.client.get('/im/')
575
        # user logged out, token cookie removed
576
        self.assertFalse(r.context['request'].user.is_authenticated())
577
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
578
        # https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
579
        del self.client.cookies['_pithos2_a']
580

    
581
        # user can login
582
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
583
                                           'password': 'password'},
584
                                          follow=True)
585
        self.assertTrue(r.context['request'].user.is_authenticated())
586
        self.assertTrue('_pithos2_a' in self.client.cookies)
587
        cookies = self.client.cookies
588
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
589
        self.client.get('/im/logout', follow=True)
590

    
591
        # user forgot password
592
        old_pass = user.password
593
        r = self.client.get('/im/local/password_reset')
594
        self.assertEqual(r.status_code, 200)
595
        r = self.client.post('/im/local/password_reset', {'email':
596
                                                          'kpap@grnet.gr'})
597
        self.assertEqual(r.status_code, 302)
598
        # email sent
599
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 4)
600

    
601
        # user visits change password link
602
        r = self.client.get(user.get_password_reset_url())
603
        r = self.client.post(user.get_password_reset_url(),
604
                            {'new_password1':'newpass',
605
                             'new_password2':'newpass'})
606

    
607
        user = AstakosUser.objects.get(pk=user.pk)
608
        self.assertNotEqual(old_pass, user.password)
609

    
610
        # old pass is not usable
611
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
612
                                           'password': 'password'})
613
        self.assertContains(r, 'Please enter a correct username and password')
614
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
615
                                           'password': 'newpass'},
616
                                           follow=True)
617
        self.assertTrue(r.context['request'].user.is_authenticated())
618
        self.client.logout()
619

    
620
        # tests of special local backends
621
        user = AstakosUser.objects.get(pk=user.pk)
622
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
623
        user.save()
624

    
625
        # non astakos local backends do not support password reset
626
        r = self.client.get('/im/local/password_reset')
627
        self.assertEqual(r.status_code, 200)
628
        r = self.client.post('/im/local/password_reset', {'email':
629
                                                          'kpap@grnet.gr'})
630
        # she can't because account is not active yet
631
        self.assertContains(r, "Password change for this account is not"
632
                                " supported")
633

    
634
class UserActionsTests(TestCase):
635

    
636
    def setUp(self):
637
        kind = GroupKind.objects.create(name="default")
638
        AstakosGroup.objects.create(name="default", kind=kind)
639

    
640
    def test_email_change(self):
641
        # to test existing email validation
642
        existing_user = get_local_user('existing@grnet.gr')
643

    
644
        # local user
645
        user = get_local_user('kpap@grnet.gr')
646

    
647
        # login as kpap
648
        self.client.login(username='kpap@grnet.gr', password='password')
649
        r = self.client.get('/im/profile', follow=True)
650
        user = r.context['request'].user
651
        self.assertTrue(user.is_authenticated())
652

    
653
        # change email is enabled
654
        r = self.client.get('/im/email_change')
655
        self.assertEqual(r.status_code, 200)
656
        self.assertFalse(user.email_change_is_pending())
657

    
658
        # request email change to an existing email fails
659
        data = {'new_email_address': 'existing@grnet.gr'}
660
        r = self.client.post('/im/email_change', data)
661
        self.assertContains(r, messages.EMAIL_USED)
662

    
663
        # proper email change
664
        data = {'new_email_address': 'kpap@gmail.com'}
665
        r = self.client.post('/im/email_change', data, follow=True)
666
        self.assertRedirects(r, '/im/profile')
667
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
668
        change1 = EmailChange.objects.get()
669

    
670
        # user sees a warning
671
        r = self.client.get('/im/email_change')
672
        self.assertEqual(r.status_code, 200)
673
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
674
        self.assertTrue(user.email_change_is_pending())
675

    
676
        # link was sent
677
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
678
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
679

    
680
        # proper email change
681
        data = {'new_email_address': 'kpap@yahoo.com'}
682
        r = self.client.post('/im/email_change', data, follow=True)
683
        self.assertRedirects(r, '/im/profile')
684
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
685
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
686
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
687
        change2 = EmailChange.objects.get()
688

    
689
        r = self.client.get(change1.get_url())
690
        self.assertEquals(r.status_code, 302)
691
        self.client.logout()
692

    
693
        r = self.client.post('/im/local?next=' + change2.get_url(),
694
                             {'username': 'kpap@grnet.gr',
695
                              'password': 'password',
696
                              'next': change2.get_url()},
697
                             follow=True)
698
        self.assertRedirects(r, '/im/profile')
699
        user = r.context['request'].user
700
        self.assertEquals(user.email, 'kpap@yahoo.com')
701
        self.assertEquals(user.username, 'kpap@yahoo.com')
702

    
703

    
704
        self.client.logout()
705
        r = self.client.post('/im/local?next=' + change2.get_url(),
706
                             {'username': 'kpap@grnet.gr',
707
                              'password': 'password',
708
                              'next': change2.get_url()},
709
                             follow=True)
710
        self.assertContains(r, "Please enter a correct username and password")
711
        self.assertEqual(user.emailchanges.count(), 0)
712