Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ cd339bab

History | View | Annotate | Download (29.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import datetime
35

    
36
from django.test import TestCase, Client
37
from django.conf import settings
38
from django.core import mail
39

    
40
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
41
from astakos.im.models import *
42
from astakos.im import functions
43
from astakos.im import settings as astakos_settings
44
from astakos.im import forms
45

    
46
from urllib import quote
47

    
48
from astakos.im import messages
49

    
50

    
51
astakos_settings.EMAILCHANGE_ENABLED = True
52

    
53
class ShibbolethClient(Client):
54
    """
55
    A shibboleth agnostic client.
56
    """
57
    VALID_TOKENS = filter(lambda x: not x.startswith("_"), dir(ShibbolethTokens))
58

    
59
    def __init__(self, *args, **kwargs):
60
        self.tokens = kwargs.pop('tokens', {})
61
        super(ShibbolethClient, self).__init__(*args, **kwargs)
62

    
63
    def set_tokens(self, **kwargs):
64
        for key, value in kwargs.iteritems():
65
            key = 'SHIB_%s' % key.upper()
66
            if not key in self.VALID_TOKENS:
67
                raise Exception('Invalid shibboleth token')
68

    
69
            self.tokens[key] = value
70

    
71
    def unset_tokens(self, *keys):
72
        for key in keys:
73
            key = 'SHIB_%s' % param.upper()
74
            if key in self.tokens:
75
                del self.tokens[key]
76

    
77
    def reset_tokens(self):
78
        self.tokens = {}
79

    
80
    def get_http_token(self, key):
81
        http_header = getattr(ShibbolethTokens, key)
82
        return http_header
83

    
84
    def request(self, **request):
85
        """
86
        Transform valid shibboleth tokens to http headers
87
        """
88
        for token, value in self.tokens.iteritems():
89
            request[self.get_http_token(token)] = value
90

    
91
        for param in request.keys():
92
            key = 'SHIB_%s' % param.upper()
93
            if key in self.VALID_TOKENS:
94
                request[self.get_http_token(key)] = request[param]
95
                del request[param]
96

    
97
        return super(ShibbolethClient, self).request(**request)
98

    
99

    
100
def get_local_user(username, **kwargs):
101
        try:
102
            return AstakosUser.objects.get(email=username)
103
        except:
104
            user_params = {
105
                'username': username,
106
                'email': username,
107
                'is_active': True,
108
                'activation_sent': datetime.now(),
109
                'email_verified': True,
110
                'provider': 'local'
111
            }
112
            user_params.update(kwargs)
113
            user = AstakosUser(**user_params)
114
            user.set_password(kwargs.get('password', 'password'))
115
            user.save()
116
            user.add_auth_provider('local', auth_backend='astakos')
117
            if kwargs.get('is_active', True):
118
                user.is_active = True
119
            else:
120
                user.is_active = False
121
            user.save()
122
            return user
123

    
124

    
125
def get_mailbox(email):
126
    mails = []
127
    for sent_email in mail.outbox:
128
        for recipient in sent_email.recipients():
129
            if email in recipient:
130
                mails.append(sent_email)
131
    return mails
132

    
133

    
134
class ShibbolethTests(TestCase):
135
    """
136
    Testing shibboleth authentication.
137
    """
138

    
139
    fixtures = ['groups']
140

    
141
    def setUp(self):
142
        kind = GroupKind.objects.create(name="default")
143
        AstakosGroup.objects.create(name="default", kind=kind)
144
        self.client = ShibbolethClient()
145
        settings.ASTAKOS_IM_MODULES = ['local', 'shibboleth']
146
        settings.ASTAKOS_MODERATION_ENABLED = True
147

    
148
    def test_create_account(self):
149

    
150
        client = ShibbolethClient()
151

    
152
        # shibboleth views validation
153
        # eepn required
154
        r = client.get('/im/login/shibboleth?', follow=True)
155
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
156
            'domain': astakos_settings.BASEURL,
157
            'contact_email': astakos_settings.DEFAULT_CONTACT_EMAIL
158
        })
159
        client.set_tokens(eppn="kpapeppn")
160

    
161
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
162
        # shibboleth user info required
163
        r = client.get('/im/login/shibboleth?', follow=True)
164
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
165
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
166

    
167
        # shibboleth logged us in
168
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
169
                          cn="Kostas Papadimitriou",
170
                          ep_affiliation="Test Affiliation")
171
        r = client.get('/im/login/shibboleth?')
172
        self.assertEqual(r.status_code, 200)
173

    
174
        # astakos asks if we want to add shibboleth
175
        self.assertContains(r, "Already have an account?")
176

    
177
        # a new pending user created
178
        pending_user = PendingThirdPartyUser.objects.get(
179
            third_party_identifier="kpapeppn")
180
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
181
        # keep the token for future use
182
        token = pending_user.token
183
        # from now on no shibboleth headers are sent to the server
184
        client.reset_tokens()
185

    
186
        # this is the old way, it should fail, to avoid pending user take over
187
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
188
        self.assertEqual(r.status_code, 404)
189

    
190
        # this is the signup unique url associated with the pending user created
191
        r = client.get('/im/signup/?third_party_token=%s' % token)
192
        form = r.context['form']
193
        post_data = {'third_party_identifier': pending_user.third_party_identifier,
194
                     'first_name': 'Kostas',
195
                     'third_party_token': token,
196
                     'last_name': 'Mitroglou',
197
                     'provider': 'shibboleth'
198
                    }
199

    
200
        # invlid email
201
        post_data['email'] = 'kpap'
202
        r = client.post('/im/signup', post_data)
203
        self.assertContains(r, token)
204

    
205
        # existing email
206
        existing_user = get_local_user('test@test.com')
207
        post_data['email'] = 'test@test.com'
208
        r = client.post('/im/signup', post_data)
209
        self.assertContains(r, messages.EMAIL_USED)
210
        existing_user.delete()
211

    
212
        # and finally a valid signup
213
        post_data['email'] = 'kpap@grnet.gr'
214
        r = client.post('/im/signup', post_data, follow=True)
215
        self.assertContains(r, messages.NOTIFICATION_SENT)
216

    
217
        # everything is ok in our db
218
        self.assertEqual(AstakosUser.objects.count(), 1)
219
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
220
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
221

    
222
        # provider info stored
223
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
224
        self.assertEqual(provider.affiliation, 'Test Affiliation')
225
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
226
                                         u'eppn': u'kpapeppn',
227
                                         u'name': u'Kostas Papadimitriou'})
228

    
229
        # lets login (not activated yet)
230
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
231
                          cn="Kostas Papadimitriou", )
232
        r = client.get("/im/login/shibboleth?", follow=True)
233
        self.assertContains(r, messages.ACCOUNT_PENDING_MODERATION)
234
        r = client.get("/im/profile", follow=True)
235
        self.assertRedirects(r, 'http://testserver/im/?next=%2Fim%2Fprofile')
236

    
237
        # admin activates our user
238
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
239
        functions.activate(u)
240
        self.assertEqual(u.is_active, True)
241

    
242
        # we see our profile
243
        r = client.get("/im/login/shibboleth?", follow=True)
244
        self.assertRedirects(r, '/im/profile')
245
        self.assertEqual(r.status_code, 200)
246

    
247
    def test_existing(self):
248
        """
249
        Test adding of third party login to an existing account
250
        """
251

    
252
        # this is our existing user
253
        existing_user = get_local_user('kpap@grnet.gr')
254

    
255
        client = ShibbolethClient()
256
        # shibboleth logged us in, notice that we use different email
257
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
258
                          cn="Kostas Papadimitriou", )
259
        r = client.get("/im/login/shibboleth?")
260

    
261
        # astakos asks if we want to switch a local account to shibboleth
262
        self.assertContains(r, "Already have an account?")
263

    
264
        # a new pending user created
265
        pending_user = PendingThirdPartyUser.objects.get()
266
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
267
        pending_key = pending_user.token
268
        client.reset_tokens()
269

    
270
        # we choose to add shibboleth to an our existing account
271
        # we get redirected to login page with the pending token set
272
        r = client.get('/im/login?key=%s' % pending_key)
273
        post_data = {'password': 'password',
274
                     'username': 'kpap@grnet.gr',
275
                     'key': pending_key}
276
        r = client.post('/im/local', post_data, follow=True)
277
        self.assertRedirects(r, "/im/profile")
278
        self.assertContains(r, messages.AUTH_PROVIDER_ADDED)
279

    
280
        self.assertTrue(existing_user.has_auth_provider('shibboleth'))
281
        self.assertTrue(existing_user.has_auth_provider('local',
282
                                                        auth_backend='astakos'))
283
        client.logout()
284

    
285
        # check that we cannot assign same third party provide twice
286
        r = client.get('/im/login?key=%s' % pending_key)
287
        post_data = {'password': 'password',
288
                     'username': 'kpap@grnet.gr',
289
                     'key': pending_key}
290
        r = self.client.post('/im/local', post_data, follow=True)
291
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_FAILED)
292
        self.client.logout()
293
        client.logout()
294

    
295
        # look Ma, i can login with both my shibboleth and local account
296
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
297
                          cn="Kostas Papadimitriou")
298
        r = client.get("/im/login/shibboleth?", follow=True)
299
        self.assertTrue(r.context['request'].user.is_authenticated())
300
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
301
        self.assertRedirects(r, '/im/profile')
302
        self.assertEqual(r.status_code, 200)
303
        client.logout()
304
        client.reset_tokens()
305

    
306
        # logged out
307
        r = client.get("/im/profile", follow=True)
308
        self.assertFalse(r.context['request'].user.is_authenticated())
309

    
310
        # login with local account also works
311
        post_data = {'password': 'password',
312
                     'username': 'kpap@grnet.gr'}
313
        r = self.client.post('/im/local', post_data, follow=True)
314
        self.assertTrue(r.context['request'].user.is_authenticated())
315
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
316
        self.assertRedirects(r, '/im/profile')
317
        self.assertEqual(r.status_code, 200)
318

    
319
        # cannot add the same eppn
320
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
321
                          cn="Kostas Papadimitriou", )
322
        r = client.get("/im/login/shibboleth?", follow=True)
323
        self.assertRedirects(r, '/im/profile')
324
        self.assertTrue(r.status_code, 200)
325
        self.assertEquals(existing_user.auth_providers.count(), 2)
326

    
327
        # but can add additional eppn
328
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
329
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
330
        r = client.get("/im/login/shibboleth?", follow=True)
331
        new_provider = existing_user.auth_providers.get(identifier="kpapeppn2")
332
        self.assertRedirects(r, '/im/profile')
333
        self.assertTrue(r.status_code, 200)
334
        self.assertEquals(existing_user.auth_providers.count(), 3)
335
        self.assertEqual(new_provider.affiliation, 'affil2')
336
        client.logout()
337
        client.reset_tokens()
338

    
339
        # cannot login with another eppn
340
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
341
                          cn="Kostas Papadimitriou")
342
        r = client.get("/im/login/shibboleth?", follow=True)
343
        self.assertFalse(r.context['request'].user.is_authenticated())
344

    
345
        # lets remove local password
346
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
347
                                       email="kpap@grnet.gr")
348
        remove_local_url = user.get_provider_remove_url('local')
349
        remove_shibbo_url = user.get_provider_remove_url('shibboleth',
350
                                                         identifier='kpapeppn')
351
        remove_shibbo2_url = user.get_provider_remove_url('shibboleth',
352
                                                         identifier='kpapeppn2')
353
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
354
                          cn="Kostas Papadimtriou")
355
        r = client.get("/im/login/shibboleth?", follow=True)
356
        client.reset_tokens()
357

    
358
        # TODO: this view should use POST
359
        r = client.get(remove_local_url)
360
        # 2 providers left
361
        self.assertEqual(user.auth_providers.count(), 2)
362
        r = client.get(remove_shibbo2_url)
363
        # 1 provider left
364
        self.assertEqual(user.auth_providers.count(), 1)
365
        # cannot remove last provider
366
        r = client.get(remove_shibbo_url)
367
        self.assertEqual(r.status_code, 403)
368
        self.client.logout()
369

    
370
        # cannot login using local credentials (notice we use another client)
371
        post_data = {'password': 'password',
372
                     'username': 'kpap@grnet.gr'}
373
        r = self.client.post('/im/local', post_data, follow=True)
374
        self.assertFalse(r.context['request'].user.is_authenticated())
375

    
376
        # we can reenable the local provider by setting a password
377
        r = client.get("/im/password_change", follow=True)
378
        r = client.post("/im/password_change", {'new_password1':'111',
379
                                                'new_password2': '111'},
380
                        follow=True)
381
        user = r.context['request'].user
382
        self.assertTrue(user.has_auth_provider('local'))
383
        self.assertTrue(user.has_auth_provider('shibboleth'))
384
        self.assertTrue(user.check_password('111'))
385
        self.assertTrue(user.has_usable_password())
386
        self.client.logout()
387

    
388
        # now we can login
389
        post_data = {'password': '111',
390
                     'username': 'kpap@grnet.gr'}
391
        r = self.client.post('/im/local', post_data, follow=True)
392
        self.assertTrue(r.context['request'].user.is_authenticated())
393

    
394
        client.reset_tokens()
395

    
396
        # we cannot take over another shibboleth identifier
397
        user2 = get_local_user('another@grnet.gr')
398
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
399
        # login
400
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
401
                          cn="Kostas Papadimitriou")
402
        r = client.get("/im/login/shibboleth?", follow=True)
403
        # try to assign existing shibboleth identifier of another user
404
        client.set_tokens(mail="kpap_second@shibboleth.gr", eppn="existingeppn",
405
                          cn="Kostas Papadimitriou")
406
        r = client.get("/im/login/shibboleth?", follow=True)
407
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_FAILED)
408
        self.assertContains(r, messages.AUTH_PROVIDER_ADD_EXISTS)
409

    
410

    
411
class LocalUserTests(TestCase):
412

    
413
    fixtures = ['groups']
414

    
415
    def setUp(self):
416
        kind = GroupKind.objects.create(name="default")
417
        AstakosGroup.objects.create(name="default", kind=kind)
418
        from django.conf import settings
419
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
420
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
421

    
422
    def test_no_moderation(self):
423
        # disable moderation
424
        astakos_settings.MODERATION_ENABLED = False
425

    
426
        # create a new user
427
        r = self.client.get("/im/signup")
428
        self.assertEqual(r.status_code, 200)
429
        data = {'email':'kpap@grnet.gr', 'password1':'password',
430
                'password2':'password', 'first_name': 'Kostas',
431
                'last_name': 'Mitroglou', 'provider': 'local'}
432
        r = self.client.post("/im/signup", data)
433

    
434
        # user created
435
        self.assertEqual(AstakosUser.objects.count(), 1)
436
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
437
                                       email="kpap@grnet.gr")
438
        self.assertEqual(user.username, 'kpap@grnet.gr')
439
        self.assertEqual(user.has_auth_provider('local'), True)
440
        self.assertFalse(user.is_active)
441

    
442
        # user (but not admin) gets notified
443
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
444
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
445
        astakos_settings.MODERATION_ENABLED = True
446

    
447
    def test_email_case(self):
448
        data = {
449
          'email': 'kPap@grnet.gr',
450
          'password1': '1234',
451
          'password2': '1234'
452
        }
453

    
454
        form = forms.LocalUserCreationForm(data)
455
        self.assertTrue(form.is_valid())
456
        user = form.save()
457
        form.store_user(user, {})
458

    
459
        u = AstakosUser.objects.get(pk=1)
460
        self.assertEqual(u.email, 'kPap@grnet.gr')
461
        self.assertEqual(u.username, 'kpap@grnet.gr')
462
        u.is_active = True
463
        u.email_verified = True
464
        u.save()
465

    
466
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
467
        login = forms.LoginForm(data=data)
468
        self.assertTrue(login.is_valid())
469

    
470
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
471
        login = forms.LoginForm(data=data)
472
        self.assertTrue(login.is_valid())
473

    
474
        data = {
475
          'email': 'kpap@grnet.gr',
476
          'password1': '1234',
477
          'password2': '1234'
478
        }
479
        form = forms.LocalUserCreationForm(data)
480
        self.assertFalse(form.is_valid())
481

    
482
    def test_local_provider(self):
483
        # enable moderation
484
        astakos_settings.MODERATION_ENABLED = True
485

    
486
        # create a user
487
        r = self.client.get("/im/signup")
488
        self.assertEqual(r.status_code, 200)
489
        data = {'email':'kpap@grnet.gr', 'password1':'password',
490
                'password2':'password', 'first_name': 'Kostas',
491
                'last_name': 'Mitroglou', 'provider': 'local'}
492
        r = self.client.post("/im/signup", data)
493

    
494
        # user created
495
        self.assertEqual(AstakosUser.objects.count(), 1)
496
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
497
                                       email="kpap@grnet.gr")
498
        self.assertEqual(user.username, 'kpap@grnet.gr')
499
        self.assertEqual(user.has_auth_provider('local'), True)
500
        self.assertFalse(user.is_active) # not activated
501
        self.assertFalse(user.email_verified) # not verified
502
        self.assertFalse(user.activation_sent) # activation automatically sent
503

    
504
        # admin gets notified and activates the user from the command line
505
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 1)
506
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
507
                                                 'password': 'password'})
508
        self.assertContains(r, messages.ACCOUNT_PENDING_MODERATION)
509
        functions.send_activation(user)
510

    
511
        # user activation fields updated and user gets notified via email
512
        user = AstakosUser.objects.get(pk=user.pk)
513
        self.assertTrue(user.activation_sent)
514
        self.assertFalse(user.email_verified)
515
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
516

    
517
        # user forgot she got registered and tries to submit registration
518
        # form. Notice the upper case in email
519
        data = {'email':'KPAP@grnet.gr', 'password1':'password',
520
                'password2':'password', 'first_name': 'Kostas',
521
                'last_name': 'Mitroglou', 'provider': 'local'}
522
        r = self.client.post("/im/signup", data)
523
        self.assertContains(r, messages.EMAIL_USED)
524

    
525
        # hmmm, email exists; lets get the password
526
        r = self.client.get('/im/local/password_reset')
527
        self.assertEqual(r.status_code, 200)
528
        r = self.client.post('/im/local/password_reset', {'email':
529
                                                          'kpap@grnet.gr'})
530
        # she can't because account is not active yet
531
        self.assertContains(r, "doesn't have an associated user account")
532

    
533
        # moderation is enabled so no automatic activation can be send
534
        r = self.client.get('/im/send/activation/%d' % user.pk)
535
        self.assertEqual(r.status_code, 403)
536
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
537

    
538
        # also she cannot login
539
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
540
                                                 'password': 'password'})
541
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION_HELP)
542
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION)
543
        self.assertNotContains(r, 'Resend activation')
544
        self.assertFalse(r.context['request'].user.is_authenticated())
545
        self.assertFalse('_pithos2_a' in self.client.cookies)
546

    
547
        # same with disabled moderation
548
        astakos_settings.MODERATION_ENABLED = False
549
        r = self.client.post('/im/local/password_reset', {'email':
550
                                                          'kpap@grnet.gr'})
551
        self.assertContains(r, "doesn't have an associated user account")
552
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
553
                                                 'password': 'password'})
554
        self.assertContains(r, messages.ACCOUNT_PENDING_ACTIVATION)
555
        self.assertContains(r, 'Resend activation')
556
        self.assertFalse(r.context['request'].user.is_authenticated())
557
        self.assertFalse('_pithos2_a' in self.client.cookies)
558

    
559
        # user sees the message and resends activation
560
        r = self.client.get('/im/send/activation/%d' % user.pk)
561
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 2)
562

    
563
        # switch back moderation setting
564
        astakos_settings.MODERATION_ENABLED = True
565
        r = self.client.get(user.get_activation_url(), follow=True)
566
        self.assertRedirects(r, "/im/profile")
567
        self.assertContains(r, "kpap@grnet.gr")
568
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 3)
569

    
570
        user = AstakosUser.objects.get(pk=user.pk)
571
        # user activated and logged in, token cookie set
572
        self.assertTrue(r.context['request'].user.is_authenticated())
573
        self.assertTrue('_pithos2_a' in self.client.cookies)
574
        cookies = self.client.cookies
575
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
576
        r = self.client.get('/im/logout', follow=True)
577
        r = self.client.get('/im/')
578
        # user logged out, token cookie removed
579
        self.assertFalse(r.context['request'].user.is_authenticated())
580
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
581
        # https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
582
        del self.client.cookies['_pithos2_a']
583

    
584
        # user can login
585
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
586
                                           'password': 'password'},
587
                                          follow=True)
588
        self.assertTrue(r.context['request'].user.is_authenticated())
589
        self.assertTrue('_pithos2_a' in self.client.cookies)
590
        cookies = self.client.cookies
591
        self.assertTrue(quote(user.auth_token) in cookies.get('_pithos2_a').value)
592
        self.client.get('/im/logout', follow=True)
593

    
594
        # user forgot password
595
        old_pass = user.password
596
        r = self.client.get('/im/local/password_reset')
597
        self.assertEqual(r.status_code, 200)
598
        r = self.client.post('/im/local/password_reset', {'email':
599
                                                          'kpap@grnet.gr'})
600
        self.assertEqual(r.status_code, 302)
601
        # email sent
602
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 4)
603

    
604
        # user visits change password link
605
        r = self.client.get(user.get_password_reset_url())
606
        r = self.client.post(user.get_password_reset_url(),
607
                            {'new_password1':'newpass',
608
                             'new_password2':'newpass'})
609

    
610
        user = AstakosUser.objects.get(pk=user.pk)
611
        self.assertNotEqual(old_pass, user.password)
612

    
613
        # old pass is not usable
614
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
615
                                           'password': 'password'})
616
        self.assertContains(r, 'Please enter a correct username and password')
617
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
618
                                           'password': 'newpass'},
619
                                           follow=True)
620
        self.assertTrue(r.context['request'].user.is_authenticated())
621
        self.client.logout()
622

    
623
        # tests of special local backends
624
        user = AstakosUser.objects.get(pk=user.pk)
625
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
626
        user.save()
627

    
628
        # non astakos local backends do not support password reset
629
        r = self.client.get('/im/local/password_reset')
630
        self.assertEqual(r.status_code, 200)
631
        r = self.client.post('/im/local/password_reset', {'email':
632
                                                          'kpap@grnet.gr'})
633
        # she can't because account is not active yet
634
        self.assertContains(r, "Password change for this account is not"
635
                                " supported")
636

    
637
class UserActionsTests(TestCase):
638

    
639
    def setUp(self):
640
        kind = GroupKind.objects.create(name="default")
641
        AstakosGroup.objects.create(name="default", kind=kind)
642

    
643
    def test_email_change(self):
644
        # to test existing email validation
645
        existing_user = get_local_user('existing@grnet.gr')
646

    
647
        # local user
648
        user = get_local_user('kpap@grnet.gr')
649

    
650
        # login as kpap
651
        self.client.login(username='kpap@grnet.gr', password='password')
652
        r = self.client.get('/im/profile', follow=True)
653
        user = r.context['request'].user
654
        self.assertTrue(user.is_authenticated())
655

    
656
        # change email is enabled
657
        r = self.client.get('/im/email_change')
658
        self.assertEqual(r.status_code, 200)
659
        self.assertFalse(user.email_change_is_pending())
660

    
661
        # request email change to an existing email fails
662
        data = {'new_email_address': 'existing@grnet.gr'}
663
        r = self.client.post('/im/email_change', data)
664
        self.assertContains(r, messages.EMAIL_USED)
665

    
666
        # proper email change
667
        data = {'new_email_address': 'kpap@gmail.com'}
668
        r = self.client.post('/im/email_change', data, follow=True)
669
        self.assertRedirects(r, '/im/profile')
670
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
671
        change1 = EmailChange.objects.get()
672

    
673
        # user sees a warning
674
        r = self.client.get('/im/email_change')
675
        self.assertEqual(r.status_code, 200)
676
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
677
        self.assertTrue(user.email_change_is_pending())
678

    
679
        # link was sent
680
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
681
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
682

    
683
        # proper email change
684
        data = {'new_email_address': 'kpap@yahoo.com'}
685
        r = self.client.post('/im/email_change', data, follow=True)
686
        self.assertRedirects(r, '/im/profile')
687
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
688
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
689
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
690
        change2 = EmailChange.objects.get()
691

    
692
        r = self.client.get(change1.get_url())
693
        self.assertEquals(r.status_code, 302)
694
        self.client.logout()
695

    
696
        r = self.client.post('/im/local?next=' + change2.get_url(),
697
                             {'username': 'kpap@grnet.gr',
698
                              'password': 'password',
699
                              'next': change2.get_url()},
700
                             follow=True)
701
        self.assertRedirects(r, '/im/profile')
702
        user = r.context['request'].user
703
        self.assertEquals(user.email, 'kpap@yahoo.com')
704
        self.assertEquals(user.username, 'kpap@yahoo.com')
705

    
706

    
707
        self.client.logout()
708
        r = self.client.post('/im/local?next=' + change2.get_url(),
709
                             {'username': 'kpap@grnet.gr',
710
                              'password': 'password',
711
                              'next': change2.get_url()},
712
                             follow=True)
713
        self.assertContains(r, "Please enter a correct username and password")
714
        self.assertEqual(user.emailchanges.count(), 0)
715