Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ 01b8fb9a

History | View | Annotate | Download (80.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings, assertIn
40

    
41
from django.test import Client
42
from django.test import TransactionTestCase as TestCase
43
from django.core import mail
44
from django.http import SimpleCookie, HttpRequest, QueryDict
45
from django.utils.importlib import import_module
46
from django.utils import simplejson as json
47

    
48
from astakos.im.activation_backends import *
49
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
50
from astakos.im.models import *
51
from astakos.im import functions
52
from astakos.im import settings as astakos_settings
53
from astakos.im import forms
54
from astakos.im import activation_backends
55

    
56
from urllib import quote
57
from datetime import timedelta
58

    
59
from astakos.im import messages
60
from astakos.im import auth_providers
61
from astakos.im import quotas
62
from astakos.im import resources
63

    
64
from django.conf import settings
65

    
66

    
67
# set some common settings
68
astakos_settings.EMAILCHANGE_ENABLED = True
69
astakos_settings.RECAPTCHA_ENABLED = False
70

    
71
settings.LOGGING_SETUP['disable_existing_loggers'] = False
72

    
73
# shortcut decorators to override provider settings
74
# e.g. shibboleth_settings(ENABLED=True) will set
75
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
76
prefixes = {'providers': 'AUTH_PROVIDER_',
77
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
78
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
79
im_settings = functools.partial(with_settings, astakos_settings)
80
shibboleth_settings = functools.partial(with_settings,
81
                                        settings,
82
                                        prefix=prefixes['shibboleth'])
83
localauth_settings = functools.partial(with_settings, settings,
84
                                       prefix=prefixes['local'])
85

    
86

    
87
class AstakosTestClient(Client):
88
    pass
89

    
90

    
91
class ShibbolethClient(AstakosTestClient):
92
    """
93
    A shibboleth agnostic client.
94
    """
95
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
96
                          dir(ShibbolethTokens))
97

    
98
    def __init__(self, *args, **kwargs):
99
        self.tokens = kwargs.pop('tokens', {})
100
        super(ShibbolethClient, self).__init__(*args, **kwargs)
101

    
102
    def set_tokens(self, **kwargs):
103
        for key, value in kwargs.iteritems():
104
            key = 'SHIB_%s' % key.upper()
105
            if not key in self.VALID_TOKENS:
106
                raise Exception('Invalid shibboleth token')
107

    
108
            self.tokens[key] = value
109

    
110
    def unset_tokens(self, *keys):
111
        for key in keys:
112
            key = 'SHIB_%s' % param.upper()
113
            if key in self.tokens:
114
                del self.tokens[key]
115

    
116
    def reset_tokens(self):
117
        self.tokens = {}
118

    
119
    def get_http_token(self, key):
120
        http_header = getattr(ShibbolethTokens, key)
121
        return http_header
122

    
123
    def request(self, **request):
124
        """
125
        Transform valid shibboleth tokens to http headers
126
        """
127
        for token, value in self.tokens.iteritems():
128
            request[self.get_http_token(token)] = value
129

    
130
        for param in request.keys():
131
            key = 'SHIB_%s' % param.upper()
132
            if key in self.VALID_TOKENS:
133
                request[self.get_http_token(key)] = request[param]
134
                del request[param]
135

    
136
        return super(ShibbolethClient, self).request(**request)
137

    
138

    
139
def get_user_client(username, password="password"):
140
    client = Client()
141
    client.login(username=username, password=password)
142
    return client
143

    
144

    
145
def get_local_user(username, **kwargs):
146
        try:
147
            return AstakosUser.objects.get(email=username)
148
        except:
149
            user_params = {
150
                'username': username,
151
                'email': username,
152
                'is_active': True,
153
                'activation_sent': datetime.now(),
154
                'email_verified': True
155
            }
156
            user_params.update(kwargs)
157
            user = AstakosUser(**user_params)
158
            user.set_password(kwargs.get('password', 'password'))
159
            user.renew_verification_code()
160
            user.save()
161
            user.add_auth_provider('local', auth_backend='astakos')
162
            if kwargs.get('is_active', True):
163
                user.is_active = True
164
            else:
165
                user.is_active = False
166
            user.save()
167
            return user
168

    
169

    
170
def get_mailbox(email):
171
    mails = []
172
    for sent_email in mail.outbox:
173
        for recipient in sent_email.recipients():
174
            if email in recipient:
175
                mails.append(sent_email)
176
    return mails
177

    
178

    
179
class ShibbolethTests(TestCase):
180
    """
181
    Testing shibboleth authentication.
182
    """
183

    
184
    fixtures = ['groups']
185

    
186
    def setUp(self):
187
        self.client = ShibbolethClient()
188
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
189
        astakos_settings.MODERATION_ENABLED = True
190

    
191
    @im_settings(FORCE_PROFILE_UPDATE=False)
192
    def test_create_account(self):
193

    
194
        client = ShibbolethClient()
195

    
196
        # shibboleth views validation
197
        # eepn required
198
        r = client.get('/im/login/shibboleth?', follow=True)
199
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
200
            'domain': astakos_settings.BASEURL,
201
            'contact_email': settings.CONTACT_EMAIL
202
        })
203
        client.set_tokens(eppn="kpapeppn")
204

    
205
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
206
        # shibboleth user info required
207
        r = client.get('/im/login/shibboleth?', follow=True)
208
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
209
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
210

    
211
        # shibboleth logged us in
212
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
213
                          cn="Kostas Papadimitriou",
214
                          ep_affiliation="Test Affiliation")
215
        r = client.get('/im/login/shibboleth?', follow=True)
216
        token = PendingThirdPartyUser.objects.get().token
217
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
218
        self.assertEqual(r.status_code, 200)
219

    
220
        # a new pending user created
221
        pending_user = PendingThirdPartyUser.objects.get(
222
            third_party_identifier="kpapeppn")
223
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
224
        # keep the token for future use
225
        token = pending_user.token
226
        # from now on no shibboleth headers are sent to the server
227
        client.reset_tokens()
228

    
229
        # this is the old way, it should fail, to avoid pending user take over
230
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
231
        self.assertEqual(r.status_code, 404)
232

    
233
        # this is the signup unique url associated with the pending user
234
        # created
235
        r = client.get('/im/signup/?third_party_token=%s' % token)
236
        identifier = pending_user.third_party_identifier
237
        post_data = {'third_party_identifier': identifier,
238
                     'first_name': 'Kostas',
239
                     'third_party_token': token,
240
                     'last_name': 'Mitroglou',
241
                     'provider': 'shibboleth'}
242

    
243
        signup_url = reverse('signup')
244

    
245
        # invlid email
246
        post_data['email'] = 'kpap'
247
        r = client.post(signup_url, post_data)
248
        self.assertContains(r, token)
249

    
250
        # existing email
251
        existing_user = get_local_user('test@test.com')
252
        post_data['email'] = 'test@test.com'
253
        r = client.post(signup_url, post_data)
254
        self.assertContains(r, messages.EMAIL_USED)
255
        existing_user.delete()
256

    
257
        # and finally a valid signup
258
        post_data['email'] = 'kpap@synnefo.org'
259
        r = client.post(signup_url, post_data, follow=True)
260
        self.assertContains(r, messages.VERIFICATION_SENT)
261

    
262
        # entires commited as expected
263
        self.assertEqual(AstakosUser.objects.count(), 1)
264
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
265
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
266

    
267
        # provider info stored
268
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
269
        self.assertEqual(provider.affiliation, 'Test Affiliation')
270
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
271
                                         u'eppn': u'kpapeppn',
272
                                         u'name': u'Kostas Papadimitriou'})
273

    
274
        # login (not activated yet)
275
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
276
                          cn="Kostas Papadimitriou", )
277
        r = client.get("/im/login/shibboleth?", follow=True)
278
        self.assertContains(r, 'is pending moderation')
279

    
280
        # admin activates the user
281
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
282
        backend = activation_backends.get_backend()
283
        activation_result = backend.verify_user(u, u.verification_code)
284
        activation_result = backend.accept_user(u)
285
        self.assertFalse(activation_result.is_error())
286
        backend.send_result_notifications(activation_result, u)
287
        self.assertEqual(u.is_active, True)
288

    
289
        # we see our profile
290
        r = client.get("/im/login/shibboleth?", follow=True)
291
        self.assertRedirects(r, '/im/landing')
292
        self.assertEqual(r.status_code, 200)
293

    
294
    def test_existing(self):
295
        """
296
        Test adding of third party login to an existing account
297
        """
298

    
299
        # this is our existing user
300
        existing_user = get_local_user('kpap@synnefo.org')
301
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
302
        existing_inactive.is_active = False
303
        existing_inactive.save()
304

    
305
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
306
        existing_unverified.is_active = False
307
        existing_unverified.activation_sent = None
308
        existing_unverified.email_verified = False
309
        existing_unverified.is_verified = False
310
        existing_unverified.save()
311

    
312
        client = ShibbolethClient()
313
        # shibboleth logged us in, notice that we use different email
314
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
315
                          cn="Kostas Papadimitriou", )
316
        r = client.get("/im/login/shibboleth?", follow=True)
317

    
318
        # a new pending user created
319
        pending_user = PendingThirdPartyUser.objects.get()
320
        token = pending_user.token
321
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
322
        pending_key = pending_user.token
323
        client.reset_tokens()
324
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
325

    
326
        form = r.context['form']
327
        signupdata = copy.copy(form.initial)
328
        signupdata['email'] = 'kpap@synnefo.org'
329
        signupdata['third_party_token'] = token
330
        signupdata['provider'] = 'shibboleth'
331
        signupdata.pop('id', None)
332

    
333
        # the email exists to another user
334
        r = client.post("/im/signup", signupdata)
335
        self.assertContains(r, "There is already an account with this email "
336
                               "address")
337
        # change the case, still cannot create
338
        signupdata['email'] = 'KPAP@synnefo.org'
339
        r = client.post("/im/signup", signupdata)
340
        self.assertContains(r, "There is already an account with this email "
341
                               "address")
342
        # inactive user
343
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
344
        r = client.post("/im/signup", signupdata)
345
        self.assertContains(r, "There is already an account with this email "
346
                               "address")
347

    
348
        # unverified user, this should pass, old entry will be deleted
349
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
350
        r = client.post("/im/signup", signupdata)
351

    
352
        post_data = {'password': 'password',
353
                     'username': 'kpap@synnefo.org'}
354
        r = client.post('/im/local', post_data, follow=True)
355
        self.assertTrue(r.context['request'].user.is_authenticated())
356
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
357
                          cn="Kostas Papadimitriou", )
358
        r = client.get("/im/login/shibboleth?", follow=True)
359
        self.assertContains(r, "enabled for this account")
360
        client.reset_tokens()
361

    
362
        user = existing_user
363
        self.assertTrue(user.has_auth_provider('shibboleth'))
364
        self.assertTrue(user.has_auth_provider('local',
365
                                               auth_backend='astakos'))
366
        client.logout()
367

    
368
        # look Ma, i can login with both my shibboleth and local account
369
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
370
                          cn="Kostas Papadimitriou")
371
        r = client.get("/im/login/shibboleth?", follow=True)
372
        self.assertTrue(r.context['request'].user.is_authenticated())
373
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
374
        self.assertRedirects(r, '/im/landing')
375
        self.assertEqual(r.status_code, 200)
376
        client.logout()
377
        client.reset_tokens()
378

    
379
        # logged out
380
        r = client.get("/im/profile", follow=True)
381
        self.assertFalse(r.context['request'].user.is_authenticated())
382

    
383
        # login with local account also works
384
        post_data = {'password': 'password',
385
                     'username': 'kpap@synnefo.org'}
386
        r = self.client.post('/im/local', post_data, follow=True)
387
        self.assertTrue(r.context['request'].user.is_authenticated())
388
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
389
        self.assertRedirects(r, '/im/landing')
390
        self.assertEqual(r.status_code, 200)
391

    
392
        # cannot add the same eppn
393
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
394
                          cn="Kostas Papadimitriou", )
395
        r = client.get("/im/login/shibboleth?", follow=True)
396
        self.assertRedirects(r, '/im/landing')
397
        self.assertTrue(r.status_code, 200)
398
        self.assertEquals(existing_user.auth_providers.count(), 2)
399

    
400
        # only one allowed by default
401
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
402
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
403
        prov = auth_providers.get_provider('shibboleth')
404
        r = client.get("/im/login/shibboleth?", follow=True)
405
        self.assertContains(r, "Failed to add")
406
        self.assertRedirects(r, '/im/profile')
407
        self.assertTrue(r.status_code, 200)
408
        self.assertEquals(existing_user.auth_providers.count(), 2)
409
        client.logout()
410
        client.reset_tokens()
411

    
412
        # cannot login with another eppn
413
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
414
                          cn="Kostas Papadimitriou")
415
        r = client.get("/im/login/shibboleth?", follow=True)
416
        self.assertFalse(r.context['request'].user.is_authenticated())
417

    
418
        # cannot
419

    
420
        # lets remove local password
421
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
422
                                       email="kpap@synnefo.org")
423
        remove_local_url = user.get_auth_provider('local').get_remove_url
424
        remove_shibbo_url = user.get_auth_provider('shibboleth',
425
                                                   'kpapeppn').get_remove_url
426
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
427
                          cn="Kostas Papadimtriou")
428
        r = client.get("/im/login/shibboleth?", follow=True)
429
        client.reset_tokens()
430

    
431
        # TODO: this view should use POST
432
        r = client.get(remove_local_url)
433
        # 2 providers left
434
        self.assertEqual(user.auth_providers.count(), 1)
435
        # cannot remove last provider
436
        r = client.get(remove_shibbo_url)
437
        self.assertEqual(r.status_code, 403)
438
        self.client.logout()
439

    
440
        # cannot login using local credentials (notice we use another client)
441
        post_data = {'password': 'password',
442
                     'username': 'kpap@synnefo.org'}
443
        r = self.client.post('/im/local', post_data, follow=True)
444
        self.assertFalse(r.context['request'].user.is_authenticated())
445

    
446
        # we can reenable the local provider by setting a password
447
        r = client.get("/im/password_change", follow=True)
448
        r = client.post("/im/password_change", {'new_password1': '111',
449
                                                'new_password2': '111'},
450
                        follow=True)
451
        user = r.context['request'].user
452
        self.assertTrue(user.has_auth_provider('local'))
453
        self.assertTrue(user.has_auth_provider('shibboleth'))
454
        self.assertTrue(user.check_password('111'))
455
        self.assertTrue(user.has_usable_password())
456
        self.client.logout()
457

    
458
        # now we can login
459
        post_data = {'password': '111',
460
                     'username': 'kpap@synnefo.org'}
461
        r = self.client.post('/im/local', post_data, follow=True)
462
        self.assertTrue(r.context['request'].user.is_authenticated())
463

    
464
        client.reset_tokens()
465

    
466
        # we cannot take over another shibboleth identifier
467
        user2 = get_local_user('another@synnefo.org')
468
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
469
        # login
470
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
471
                          cn="Kostas Papadimitriou")
472
        r = client.get("/im/login/shibboleth?", follow=True)
473
        # try to assign existing shibboleth identifier of another user
474
        client.set_tokens(mail="kpap_second@shibboleth.gr",
475
                          eppn="existingeppn", cn="Kostas Papadimitriou")
476
        r = client.get("/im/login/shibboleth?", follow=True)
477
        self.assertContains(r, "this account is already assigned")
478

    
479

    
480
class TestLocal(TestCase):
481

    
482
    fixtures = ['groups']
483

    
484
    def setUp(self):
485
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
486
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
487
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
488
        settings.ASTAKOS_MODERATION_ENABLED = True
489

    
490
    def tearDown(self):
491
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
492

    
493
    def test_no_moderation(self):
494
        # disable moderation
495
        astakos_settings.MODERATION_ENABLED = False
496

    
497
        # create a new user
498
        r = self.client.get("/im/signup")
499
        self.assertEqual(r.status_code, 200)
500
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
501
                'password2': 'password', 'first_name': 'Kostas',
502
                'last_name': 'Mitroglou', 'provider': 'local'}
503
        r = self.client.post("/im/signup", data)
504

    
505
        # user created
506
        self.assertEqual(AstakosUser.objects.count(), 1)
507
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
508
                                       email="kpap@synnefo.org")
509
        self.assertEqual(user.username, 'kpap@synnefo.org')
510
        self.assertEqual(user.has_auth_provider('local'), True)
511
        self.assertFalse(user.is_active)
512

    
513
        # user (but not admin) gets notified
514
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
515
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
516
        astakos_settings.MODERATION_ENABLED = True
517

    
518
    def test_email_case(self):
519
        data = {
520
            'email': 'kPap@synnefo.org',
521
            'password1': '1234',
522
            'password2': '1234'
523
        }
524

    
525
        form = forms.LocalUserCreationForm(data)
526
        self.assertTrue(form.is_valid())
527
        user = form.save()
528
        form.store_user(user, {})
529

    
530
        u = AstakosUser.objects.get()
531
        self.assertEqual(u.email, 'kPap@synnefo.org')
532
        self.assertEqual(u.username, 'kpap@synnefo.org')
533
        u.is_active = True
534
        u.email_verified = True
535
        u.save()
536

    
537
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
538
        login = forms.LoginForm(data=data)
539
        self.assertTrue(login.is_valid())
540

    
541
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
542
        login = forms.LoginForm(data=data)
543
        self.assertTrue(login.is_valid())
544

    
545
        data = {
546
            'email': 'kpap@synnefo.org',
547
            'password1': '1234',
548
            'password2': '1234'
549
        }
550
        form = forms.LocalUserCreationForm(data)
551
        self.assertFalse(form.is_valid())
552

    
553
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
554
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
555
    def test_local_provider(self):
556
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
557

    
558
        # create a user
559
        r = self.client.get("/im/signup")
560
        self.assertEqual(r.status_code, 200)
561
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
562
                'password2': 'password', 'first_name': 'Kostas',
563
                'last_name': 'Mitroglou', 'provider': 'local'}
564
        r = self.client.post("/im/signup", data)
565

    
566
        # user created
567
        self.assertEqual(AstakosUser.objects.count(), 1)
568
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
569
                                       email="kpap@synnefo.org")
570
        self.assertEqual(user.username, 'kpap@synnefo.org')
571
        self.assertEqual(user.has_auth_provider('local'), True)
572
        self.assertFalse(user.is_active)  # not activated
573
        self.assertFalse(user.email_verified)  # not verified
574
        self.assertTrue(user.activation_sent)  # activation automatically sent
575
        self.assertFalse(user.moderated)
576
        self.assertFalse(user.email_verified)
577

    
578
        # admin gets notified and activates the user from the command line
579
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
580
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
581
                                           'password': 'password'},
582
                             follow=True)
583
        self.assertContains(r, messages.VERIFICATION_SENT)
584
        backend = activation_backends.get_backend()
585

    
586
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
587
        backend.send_user_verification_email(user)
588

    
589
        # user activation fields updated and user gets notified via email
590
        user = AstakosUser.objects.get(pk=user.pk)
591
        self.assertTrue(user.activation_sent)
592
        self.assertFalse(user.email_verified)
593
        self.assertFalse(user.is_active)
594
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
595

    
596
        # user forgot she got registered and tries to submit registration
597
        # form. Notice the upper case in email
598
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
599
                'password2': 'password', 'first_name': 'Kostas',
600
                'last_name': 'Mitroglou', 'provider': 'local'}
601
        r = self.client.post("/im/signup", data, follow=True)
602
        self.assertRedirects(r, reverse('index'))
603
        self.assertContains(r, messages.VERIFICATION_SENT)
604

    
605
        user = AstakosUser.objects.get()
606
        # previous user replaced
607
        self.assertTrue(user.activation_sent)
608
        self.assertFalse(user.email_verified)
609
        self.assertFalse(user.is_active)
610
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
611

    
612
        # hmmm, email exists; lets request a password change
613
        r = self.client.get('/im/local/password_reset')
614
        self.assertEqual(r.status_code, 200)
615
        data = {'email': 'kpap@synnefo.org'}
616
        r = self.client.post('/im/local/password_reset', data, follow=True)
617
        # she can't because account is not active yet
618
        self.assertContains(r, 'pending activation')
619

    
620
        # moderation is enabled and an activation email has already been sent
621
        # so user can trigger resend of the activation email
622
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
623
        self.assertContains(r, 'has been sent to your email address.')
624
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
625

    
626
        # also she cannot login
627
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
628
        r = self.client.post('/im/local', data, follow=True)
629
        self.assertContains(r, 'Resend activation')
630
        self.assertFalse(r.context['request'].user.is_authenticated())
631
        self.assertFalse('_pithos2_a' in self.client.cookies)
632

    
633
        # user sees the message and resends activation
634
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
635
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
636

    
637
        # logged in user cannot activate another account
638
        tmp_user = get_local_user("test_existing_user@synnefo.org")
639
        tmp_client = Client()
640
        tmp_client.login(username="test_existing_user@synnefo.org",
641
                         password="password")
642
        r = tmp_client.get(user.get_activation_url(), follow=True)
643
        self.assertContains(r, messages.LOGGED_IN_WARNING)
644

    
645
        r = self.client.get(user.get_activation_url(), follow=True)
646
        # previous code got invalidated
647
        self.assertEqual(r.status_code, 404)
648

    
649
        user = AstakosUser.objects.get(pk=user.pk)
650
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
651
        r = self.client.get(user.get_activation_url(), follow=True)
652
        self.assertRedirects(r, reverse('index'))
653
        # user sees that account is pending approval from admins
654
        self.assertContains(r, messages.NOTIFICATION_SENT)
655
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
656

    
657
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
658
        result = backend.handle_moderation(user)
659
        backend.send_result_notifications(result, user)
660
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
661
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
662

    
663
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
664
        r = self.client.get('/im/profile', follow=True)
665
        self.assertFalse(r.context['request'].user.is_authenticated())
666
        self.assertFalse('_pithos2_a' in self.client.cookies)
667
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
668

    
669
        user = AstakosUser.objects.get(pk=user.pk)
670
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
671
                                           'password': 'password'},
672
                             follow=True)
673
        # user activated and logged in, token cookie set
674
        self.assertTrue(r.context['request'].user.is_authenticated())
675
        self.assertTrue('_pithos2_a' in self.client.cookies)
676
        cookies = self.client.cookies
677
        self.assertTrue(quote(user.auth_token) in
678
                        cookies.get('_pithos2_a').value)
679
        r = self.client.get('/im/logout', follow=True)
680
        r = self.client.get('/im/')
681
        # user logged out, token cookie removed
682
        self.assertFalse(r.context['request'].user.is_authenticated())
683
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
684

    
685
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
686
        del self.client.cookies['_pithos2_a']
687

    
688
        # user can login
689
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
690
                                           'password': 'password'},
691
                             follow=True)
692
        self.assertTrue(r.context['request'].user.is_authenticated())
693
        self.assertTrue('_pithos2_a' in self.client.cookies)
694
        cookies = self.client.cookies
695
        self.assertTrue(quote(user.auth_token) in
696
                        cookies.get('_pithos2_a').value)
697
        self.client.get('/im/logout', follow=True)
698

    
699
        # user forgot password
700
        old_pass = user.password
701
        r = self.client.get('/im/local/password_reset')
702
        self.assertEqual(r.status_code, 200)
703
        r = self.client.post('/im/local/password_reset', {'email':
704
                                                          'kpap@synnefo.org'})
705
        self.assertEqual(r.status_code, 302)
706
        # email sent
707
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
708

    
709
        # user visits change password link
710
        user = AstakosUser.objects.get(pk=user.pk)
711
        r = self.client.get(user.get_password_reset_url())
712
        r = self.client.post(user.get_password_reset_url(),
713
                             {'new_password1': 'newpass',
714
                              'new_password2': 'newpass'})
715

    
716
        user = AstakosUser.objects.get(pk=user.pk)
717
        self.assertNotEqual(old_pass, user.password)
718

    
719
        # old pass is not usable
720
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
721
                                           'password': 'password'})
722
        self.assertContains(r, 'Please enter a correct username and password')
723
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
724
                                           'password': 'newpass'},
725
                             follow=True)
726
        self.assertTrue(r.context['request'].user.is_authenticated())
727
        self.client.logout()
728

    
729
        # tests of special local backends
730
        user = AstakosUser.objects.get(pk=user.pk)
731
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
732
        user.save()
733

    
734
        # non astakos local backends do not support password reset
735
        r = self.client.get('/im/local/password_reset')
736
        self.assertEqual(r.status_code, 200)
737
        r = self.client.post('/im/local/password_reset', {'email':
738
                                                          'kpap@synnefo.org'})
739
        # she can't because account is not active yet
740
        self.assertContains(r, "Changing password is not")
741

    
742

    
743
class UserActionsTests(TestCase):
744

    
745
    def test_email_change(self):
746
        # to test existing email validation
747
        get_local_user('existing@synnefo.org')
748

    
749
        # local user
750
        user = get_local_user('kpap@synnefo.org')
751

    
752
        # login as kpap
753
        self.client.login(username='kpap@synnefo.org', password='password')
754
        r = self.client.get('/im/profile', follow=True)
755
        user = r.context['request'].user
756
        self.assertTrue(user.is_authenticated())
757

    
758
        # change email is enabled
759
        r = self.client.get('/im/email_change')
760
        self.assertEqual(r.status_code, 200)
761
        self.assertFalse(user.email_change_is_pending())
762

    
763
        # request email change to an existing email fails
764
        data = {'new_email_address': 'existing@synnefo.org'}
765
        r = self.client.post('/im/email_change', data)
766
        self.assertContains(r, messages.EMAIL_USED)
767

    
768
        # proper email change
769
        data = {'new_email_address': 'kpap@gmail.com'}
770
        r = self.client.post('/im/email_change', data, follow=True)
771
        self.assertRedirects(r, '/im/profile')
772
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
773
        change1 = EmailChange.objects.get()
774

    
775
        # user sees a warning
776
        r = self.client.get('/im/email_change')
777
        self.assertEqual(r.status_code, 200)
778
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
779
        self.assertTrue(user.email_change_is_pending())
780

    
781
        # link was sent
782
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
783
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
784

    
785
        # proper email change
786
        data = {'new_email_address': 'kpap@yahoo.com'}
787
        r = self.client.post('/im/email_change', data, follow=True)
788
        self.assertRedirects(r, '/im/profile')
789
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
790
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
791
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
792
        change2 = EmailChange.objects.get()
793

    
794
        r = self.client.get(change1.get_url())
795
        self.assertEquals(r.status_code, 302)
796
        self.client.logout()
797

    
798
        r = self.client.post('/im/local?next=' + change2.get_url(),
799
                             {'username': 'kpap@synnefo.org',
800
                              'password': 'password',
801
                              'next': change2.get_url()},
802
                             follow=True)
803
        self.assertRedirects(r, '/im/profile')
804
        user = r.context['request'].user
805
        self.assertEquals(user.email, 'kpap@yahoo.com')
806
        self.assertEquals(user.username, 'kpap@yahoo.com')
807

    
808
        self.client.logout()
809
        r = self.client.post('/im/local?next=' + change2.get_url(),
810
                             {'username': 'kpap@synnefo.org',
811
                              'password': 'password',
812
                              'next': change2.get_url()},
813
                             follow=True)
814
        self.assertContains(r, "Please enter a correct username and password")
815
        self.assertEqual(user.emailchanges.count(), 0)
816

    
817

    
818
class TestAuthProviderViews(TestCase):
819

    
820
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
821
                         AUTOMODERATE_POLICY=True)
822
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
823
                 FORCE_PROFILE_UPDATE=False)
824
    def test_user(self):
825
        Profile = AuthProviderPolicyProfile
826
        Pending = PendingThirdPartyUser
827
        User = AstakosUser
828

    
829
        User.objects.create(email="newuser@synnefo.org")
830
        get_local_user("olduser@synnefo.org")
831
        cl_olduser = ShibbolethClient()
832
        get_local_user("olduser2@synnefo.org")
833
        ShibbolethClient()
834
        cl_newuser = ShibbolethClient()
835
        cl_newuser2 = Client()
836

    
837
        academic_group, created = Group.objects.get_or_create(
838
            name='academic-login')
839
        academic_users = academic_group.user_set
840
        assert created
841
        policy_only_academic = Profile.objects.add_policy('academic_strict',
842
                                                          'shibboleth',
843
                                                          academic_group,
844
                                                          exclusive=True,
845
                                                          login=False,
846
                                                          add=False)
847

    
848
        # new academic user
849
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
850
        cl_newuser.set_tokens(eppn="newusereppn")
851
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
852
        pending = Pending.objects.get()
853
        identifier = pending.third_party_identifier
854
        signup_data = {'third_party_identifier': identifier,
855
                       'first_name': 'Academic',
856
                       'third_party_token': pending.token,
857
                       'last_name': 'New User',
858
                       'provider': 'shibboleth'}
859
        r = cl_newuser.post('/im/signup', signup_data)
860
        self.assertContains(r, "This field is required", )
861
        signup_data['email'] = 'olduser@synnefo.org'
862
        r = cl_newuser.post('/im/signup', signup_data)
863
        self.assertContains(r, "already an account with this email", )
864
        signup_data['email'] = 'newuser@synnefo.org'
865
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
866
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
867
        self.assertEqual(r.status_code, 404)
868
        newuser = User.objects.get(email="newuser@synnefo.org")
869
        activation_link = newuser.get_activation_url()
870
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
871

    
872
        # new non-academic user
873
        signup_data = {'first_name': 'Non Academic',
874
                       'last_name': 'New User',
875
                       'provider': 'local',
876
                       'password1': 'password',
877
                       'password2': 'password'}
878
        signup_data['email'] = 'olduser@synnefo.org'
879
        r = cl_newuser2.post('/im/signup', signup_data)
880
        self.assertContains(r, 'There is already an account with this '
881
                               'email address')
882
        signup_data['email'] = 'newuser@synnefo.org'
883
        r = cl_newuser2.post('/im/signup/', signup_data)
884
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
885
        r = self.client.get(activation_link, follow=True)
886
        self.assertEqual(r.status_code, 404)
887
        newuser = User.objects.get(email="newuser@synnefo.org")
888
        self.assertTrue(newuser.activation_sent)
889

    
890
        # activation sent, user didn't open verification url so additional
891
        # registrations invalidate the previous signups.
892
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
893
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
894
        pending = Pending.objects.get()
895
        identifier = pending.third_party_identifier
896
        signup_data = {'third_party_identifier': identifier,
897
                       'first_name': 'Academic',
898
                       'third_party_token': pending.token,
899
                       'last_name': 'New User',
900
                       'provider': 'shibboleth'}
901
        signup_data['email'] = 'newuser@synnefo.org'
902
        r = cl_newuser.post('/im/signup', signup_data)
903
        self.assertEqual(r.status_code, 302)
904
        newuser = User.objects.get(email="newuser@synnefo.org")
905
        self.assertTrue(newuser.activation_sent)
906
        activation_link = newuser.get_activation_url()
907
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
908
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
909
        self.assertRedirects(r, '/im/landing')
910
        newuser = User.objects.get(email="newuser@synnefo.org")
911
        self.assertEqual(newuser.is_active, True)
912
        self.assertEqual(newuser.email_verified, True)
913
        cl_newuser.logout()
914

    
915
        # cannot reactivate if suspended
916
        newuser.is_active = False
917
        newuser.save()
918
        r = cl_newuser.get(newuser.get_activation_url())
919
        newuser = User.objects.get(email="newuser@synnefo.org")
920
        self.assertFalse(newuser.is_active)
921

    
922
        # release suspension
923
        newuser.is_active = True
924
        newuser.save()
925

    
926
        cl_newuser.get('/im/login/shibboleth?', follow=True)
927
        local = auth.get_provider('local', newuser)
928
        self.assertEqual(local.get_add_policy, False)
929
        self.assertEqual(local.get_login_policy, False)
930
        r = cl_newuser.get(local.get_add_url, follow=True)
931
        self.assertRedirects(r, '/im/profile')
932
        self.assertContains(r, 'disabled for your')
933

    
934
        cl_olduser.login(username='olduser@synnefo.org', password="password")
935
        r = cl_olduser.get('/im/profile', follow=True)
936
        self.assertEqual(r.status_code, 200)
937
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
938
        self.assertContains(r, 'Your request is missing a unique token')
939
        cl_olduser.set_tokens(eppn="newusereppn")
940
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
941
        self.assertContains(r, 'is already assigned to another user')
942
        cl_olduser.set_tokens(eppn="oldusereppn")
943
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
944
        self.assertContains(r, 'Academic login enabled for this account')
945

    
946
        user = User.objects.get(email="olduser@synnefo.org")
947
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
948
        local_provider = user.get_auth_provider('local')
949
        self.assertEqual(shib_provider.get_remove_policy, True)
950
        self.assertEqual(local_provider.get_remove_policy, True)
951

    
952
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
953
                                                          'shibboleth',
954
                                                          academic_group,
955
                                                          remove=False)
956
        user.groups.add(academic_group)
957
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
958
        local_provider = user.get_auth_provider('local')
959
        self.assertEqual(shib_provider.get_remove_policy, False)
960
        self.assertEqual(local_provider.get_remove_policy, True)
961
        self.assertEqual(local_provider.get_login_policy, False)
962

    
963
        cl_olduser.logout()
964
        login_data = {'username': 'olduser@synnefo.org', 'password': 'password'}
965
        r = cl_olduser.post('/im/local', login_data, follow=True)
966
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
967
        Group.objects.all().delete()
968

    
969

    
970
class TestAuthProvidersAPI(TestCase):
971
    """
972
    Test auth_providers module API
973
    """
974

    
975
    @im_settings(IM_MODULES=['local', 'shibboleth'])
976
    def test_create(self):
977
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
978
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
979

    
980
        module = 'shibboleth'
981
        identifier = 'SHIB_UUID'
982
        provider_params = {
983
            'affiliation': 'UNIVERSITY',
984
            'info': {'age': 27}
985
        }
986
        provider = auth.get_provider(module, user2, identifier,
987
                                     **provider_params)
988
        provider.add_to_user()
989
        provider = auth.get_provider(module, user, identifier,
990
                                     **provider_params)
991
        provider.add_to_user()
992
        user.email_verified = True
993
        user.save()
994
        self.assertRaises(Exception, provider.add_to_user)
995
        provider = user.get_auth_provider(module, identifier)
996
        self.assertEqual(user.get_auth_provider(
997
            module, identifier)._instance.info.get('age'), 27)
998

    
999
        module = 'local'
1000
        identifier = None
1001
        provider_params = {'auth_backend': 'ldap', 'info':
1002
                          {'office': 'A1'}}
1003
        provider = auth.get_provider(module, user, identifier,
1004
                                     **provider_params)
1005
        provider.add_to_user()
1006
        self.assertFalse(provider.get_add_policy)
1007
        self.assertRaises(Exception, provider.add_to_user)
1008

    
1009
        shib = user.get_auth_provider('shibboleth',
1010
                                      'SHIB_UUID')
1011
        self.assertTrue(shib.get_remove_policy)
1012

    
1013
        local = user.get_auth_provider('local')
1014
        self.assertTrue(local.get_remove_policy)
1015

    
1016
        local.remove_from_user()
1017
        self.assertFalse(shib.get_remove_policy)
1018
        self.assertRaises(Exception, shib.remove_from_user)
1019

    
1020
        provider = user.get_auth_providers()[0]
1021
        self.assertRaises(Exception, provider.add_to_user)
1022

    
1023
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1024
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1025
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1026
                                                 'group2'])
1027
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1028
                        CREATION_GROUPS_POLICY=['localgroup-create',
1029
                                                'group-create'])
1030
    def test_add_groups(self):
1031
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1032
        provider = auth.get_provider('shibboleth', user, 'test123')
1033
        provider.add_to_user()
1034
        user = AstakosUser.objects.get()
1035
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1036
                              sorted([u'group1', u'group2', u'group-create']))
1037

    
1038
        local = auth.get_provider('local', user)
1039
        local.add_to_user()
1040
        provider = user.get_auth_provider('shibboleth')
1041
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1042
        provider.remove_from_user()
1043
        user = AstakosUser.objects.get()
1044
        self.assertEqual(len(user.get_auth_providers()), 1)
1045
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1046
                              sorted([u'group-create', u'localgroup']))
1047

    
1048
        local = user.get_auth_provider('local')
1049
        self.assertRaises(Exception, local.remove_from_user)
1050
        provider = auth.get_provider('shibboleth', user, 'test123')
1051
        provider.add_to_user()
1052
        user = AstakosUser.objects.get()
1053
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1054
                              sorted([u'group-create', u'group1', u'group2',
1055
                               u'localgroup']))
1056
        Group.objects.all().delete()
1057

    
1058

    
1059

    
1060
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1061
    def test_policies(self):
1062
        group_old, created = Group.objects.get_or_create(name='olduser')
1063

    
1064
        astakos_settings.MODERATION_ENABLED = True
1065
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1066
            ['academic-user']
1067
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1068
            ['google-user']
1069

    
1070
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1071
        user.groups.add(group_old)
1072
        user.add_auth_provider('local')
1073

    
1074
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
1075
        user2.add_auth_provider('shibboleth', identifier='shibid')
1076

    
1077
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
1078
        user3.groups.add(group_old)
1079
        user3.add_auth_provider('local')
1080
        user3.add_auth_provider('shibboleth', identifier='1234')
1081

    
1082
        self.assertTrue(user2.groups.get(name='academic-user'))
1083
        self.assertFalse(user2.groups.filter(name='olduser').count())
1084

    
1085
        local = auth_providers.get_provider('local')
1086
        self.assertTrue(local.get_add_policy)
1087

    
1088
        academic_group = Group.objects.get(name='academic-user')
1089
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1090
                                                     academic_group,
1091
                                                     exclusive=True,
1092
                                                     add=False,
1093
                                                     login=False)
1094
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1095
                                                     academic_group,
1096
                                                     exclusive=True,
1097
                                                     login=False,
1098
                                                     add=False)
1099
        # no duplicate entry gets created
1100
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1101

    
1102
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1103
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1104
                                                     user2,
1105
                                                     remove=False)
1106
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1107

    
1108
        local = auth_providers.get_provider('local', user2)
1109
        google = auth_providers.get_provider('google', user2)
1110
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1111
        self.assertTrue(shibboleth.get_login_policy)
1112
        self.assertFalse(shibboleth.get_remove_policy)
1113
        self.assertFalse(local.get_add_policy)
1114
        self.assertFalse(local.get_add_policy)
1115
        self.assertFalse(google.get_add_policy)
1116

    
1117
        user2.groups.remove(Group.objects.get(name='academic-user'))
1118
        self.assertTrue(local.get_add_policy)
1119
        self.assertTrue(google.get_add_policy)
1120
        user2.groups.add(Group.objects.get(name='academic-user'))
1121

    
1122
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1123
                                                     user2,
1124
                                                     exclusive=True,
1125
                                                     add=True)
1126
        self.assertTrue(local.get_add_policy)
1127
        self.assertTrue(google.get_add_policy)
1128

    
1129
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1130
        self.assertFalse(local.get_automoderate_policy)
1131
        self.assertFalse(google.get_automoderate_policy)
1132
        self.assertTrue(shibboleth.get_automoderate_policy)
1133

    
1134
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1135
                  'GOOGLE_ADD_GROUPS_POLICY']:
1136
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1137

    
1138

    
1139
    @shibboleth_settings(CREATE_POLICY=True)
1140
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1141
    def test_create_http(self):
1142
        # this should be wrapped inside a transaction
1143
        user = AstakosUser(email="test@test.com")
1144
        user.save()
1145
        provider = auth_providers.get_provider('shibboleth', user,
1146
                                               'test@academia.test')
1147
        provider.add_to_user()
1148
        user.get_auth_provider('shibboleth', 'test@academia.test')
1149
        provider = auth_providers.get_provider('local', user)
1150
        provider.add_to_user()
1151
        user.get_auth_provider('local')
1152

    
1153
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1154
        user = AstakosUser(email="test2@test.com")
1155
        user.save()
1156
        provider = auth_providers.get_provider('shibboleth', user,
1157
                                               'test@shibboleth.com',
1158
                                               **{'info': {'name':
1159
                                                                'User Test'}})
1160
        self.assertFalse(provider.get_create_policy)
1161
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1162
        self.assertTrue(provider.get_create_policy)
1163
        academic = provider.add_to_user()
1164

    
1165
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1166
    @shibboleth_settings(LIMIT_POLICY=2)
1167
    def test_policies(self):
1168
        user = get_local_user('kpap@synnefo.org')
1169
        user.add_auth_provider('shibboleth', identifier='1234')
1170
        user.add_auth_provider('shibboleth', identifier='12345')
1171

    
1172
        # default limit is 1
1173
        local = user.get_auth_provider('local')
1174
        self.assertEqual(local.get_add_policy, False)
1175

    
1176
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1177
        academic = user.get_auth_provider('shibboleth',
1178
                                          identifier='1234')
1179
        self.assertEqual(academic.get_add_policy, False)
1180
        newacademic = auth_providers.get_provider('shibboleth', user,
1181
                                                  identifier='123456')
1182
        self.assertEqual(newacademic.get_add_policy, True)
1183
        user.add_auth_provider('shibboleth', identifier='123456')
1184
        self.assertEqual(academic.get_add_policy, False)
1185
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1186

    
1187
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1188
    @shibboleth_settings(LIMIT_POLICY=2)
1189
    def test_messages(self):
1190
        user = get_local_user('kpap@synnefo.org')
1191
        user.add_auth_provider('shibboleth', identifier='1234')
1192
        user.add_auth_provider('shibboleth', identifier='12345')
1193
        provider = auth_providers.get_provider('shibboleth')
1194
        self.assertEqual(provider.get_message('title'), 'Academic')
1195
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1196
        # regenerate messages cache
1197
        provider = auth_providers.get_provider('shibboleth')
1198
        self.assertEqual(provider.get_message('title'), 'New title')
1199
        self.assertEqual(provider.get_message('login_title'),
1200
                         'New title LOGIN')
1201
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1202
        self.assertEqual(provider.get_module_icon,
1203
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1204
        self.assertEqual(provider.get_module_medium_icon,
1205
                         settings.MEDIA_URL +
1206
                         'im/auth/icons-medium/shibboleth.png')
1207

    
1208
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1209
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1210
        self.assertEqual(provider.get_method_details_msg,
1211
                         'Account: 12345')
1212
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1213
        self.assertEqual(provider.get_method_details_msg,
1214
                         'Account: 1234')
1215

    
1216
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1217
        self.assertEqual(provider.get_not_active_msg,
1218
                         "'Academic login' is disabled.")
1219

    
1220
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1221
    @shibboleth_settings(LIMIT_POLICY=2)
1222
    def test_templates(self):
1223
        user = get_local_user('kpap@synnefo.org')
1224
        user.add_auth_provider('shibboleth', identifier='1234')
1225
        user.add_auth_provider('shibboleth', identifier='12345')
1226

    
1227
        provider = auth_providers.get_provider('shibboleth')
1228
        self.assertEqual(provider.get_template('login'),
1229
                         'im/auth/shibboleth_login.html')
1230
        provider = auth_providers.get_provider('google')
1231
        self.assertEqual(provider.get_template('login'),
1232
                         'im/auth/generic_login.html')
1233

    
1234

    
1235
class TestActivationBackend(TestCase):
1236

    
1237
    def setUp(self):
1238
        # dummy call to pass through logging middleware
1239
        self.client.get('/im/')
1240

    
1241
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1242
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1243
    def test_policies(self):
1244
        backend = activation_backends.get_backend()
1245

    
1246
        # email matches RE_USER_EMAIL_PATTERNS
1247
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1248
                               is_active=False, email_verified=False)
1249
        backend.handle_verification(user1, user1.verification_code)
1250
        self.assertEqual(user1.accepted_policy, 'email')
1251

    
1252
        # manually moderated
1253
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1254
                               is_active=False, email_verified=False)
1255

    
1256
        backend.handle_verification(user2, user2.verification_code)
1257
        self.assertEqual(user2.moderated, False)
1258
        backend.handle_moderation(user2)
1259
        self.assertEqual(user2.moderated, True)
1260
        self.assertEqual(user2.accepted_policy, 'manual')
1261

    
1262
        # autoaccept due to provider automoderate policy
1263
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1264
                               is_active=False, email_verified=False)
1265
        user3.auth_providers.all().delete()
1266
        user3.add_auth_provider('shibboleth', identifier='shib123')
1267
        backend.handle_verification(user3, user3.verification_code)
1268
        self.assertEqual(user3.moderated, True)
1269
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1270

    
1271
    @im_settings(MODERATION_ENABLED=False,
1272
                 MANAGERS=(('Manager',
1273
                            'manager@synnefo.org'),),
1274
                 HELPDESK=(('Helpdesk',
1275
                            'helpdesk@synnefo.org'),),
1276
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1277
    def test_without_moderation(self):
1278
        backend = activation_backends.get_backend()
1279
        form = backend.get_signup_form('local')
1280
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1281

    
1282
        user_data = {
1283
            'email': 'kpap@synnefo.org',
1284
            'first_name': 'Kostas Papas',
1285
            'password1': '123',
1286
            'password2': '123'
1287
        }
1288
        form = backend.get_signup_form('local', user_data)
1289
        user = form.save(commit=False)
1290
        form.store_user(user)
1291
        self.assertEqual(user.is_active, False)
1292
        self.assertEqual(user.email_verified, False)
1293

    
1294
        # step one, registration
1295
        result = backend.handle_registration(user)
1296
        user = AstakosUser.objects.get()
1297
        self.assertEqual(user.is_active, False)
1298
        self.assertEqual(user.email_verified, False)
1299
        self.assertTrue(user.verification_code)
1300
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1301
        backend.send_result_notifications(result, user)
1302
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1303
        self.assertEqual(len(mail.outbox), 1)
1304

    
1305
        # step two, verify email (automatically
1306
        # moderates/accepts user, since moderation is disabled)
1307
        user = AstakosUser.objects.get()
1308
        valid_code = user.verification_code
1309

    
1310
        # test invalid code
1311
        result = backend.handle_verification(user, valid_code)
1312
        backend.send_result_notifications(result, user)
1313
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1314
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1315
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1316
        # verification + activated + greeting = 3
1317
        self.assertEqual(len(mail.outbox), 3)
1318
        user = AstakosUser.objects.get()
1319
        self.assertEqual(user.is_active, True)
1320
        self.assertEqual(user.moderated, True)
1321
        self.assertTrue(user.moderated_at)
1322
        self.assertEqual(user.email_verified, True)
1323
        self.assertTrue(user.activation_sent)
1324

    
1325
    @im_settings(MODERATION_ENABLED=True,
1326
                 MANAGERS=(('Manager',
1327
                            'manager@synnefo.org'),),
1328
                 HELPDESK=(('Helpdesk',
1329
                            'helpdesk@synnefo.org'),),
1330
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1331
    def test_with_moderation(self):
1332

    
1333
        backend = activation_backends.get_backend()
1334
        form = backend.get_signup_form('local')
1335
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1336

    
1337
        user_data = {
1338
            'email': 'kpap@synnefo.org',
1339
            'first_name': 'Kostas Papas',
1340
            'password1': '123',
1341
            'password2': '123'
1342
        }
1343
        form = backend.get_signup_form(provider='local',
1344
                                       initial_data=user_data)
1345
        user = form.save(commit=False)
1346
        form.store_user(user)
1347
        self.assertEqual(user.is_active, False)
1348
        self.assertEqual(user.email_verified, False)
1349

    
1350
        # step one, registration
1351
        result = backend.handle_registration(user)
1352
        user = AstakosUser.objects.get()
1353
        self.assertEqual(user.is_active, False)
1354
        self.assertEqual(user.email_verified, False)
1355
        self.assertTrue(user.verification_code)
1356
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1357
        backend.send_result_notifications(result, user)
1358
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1359
        self.assertEqual(len(mail.outbox), 1)
1360

    
1361
        # step two, verifying email
1362
        user = AstakosUser.objects.get()
1363
        valid_code = user.verification_code
1364
        invalid_code = user.verification_code + 'invalid'
1365

    
1366
        # test invalid code
1367
        result = backend.handle_verification(user, invalid_code)
1368
        self.assertEqual(result.status, backend.Result.ERROR)
1369
        backend.send_result_notifications(result, user)
1370
        user = AstakosUser.objects.get()
1371
        self.assertEqual(user.is_active, False)
1372
        self.assertEqual(user.moderated, False)
1373
        self.assertEqual(user.moderated_at, None)
1374
        self.assertEqual(user.email_verified, False)
1375
        self.assertTrue(user.activation_sent)
1376

    
1377
        # test valid code
1378
        user = AstakosUser.objects.get()
1379
        result = backend.handle_verification(user, valid_code)
1380
        backend.send_result_notifications(result, user)
1381
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1382
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1383
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1384
        self.assertEqual(len(mail.outbox), 2)
1385
        user = AstakosUser.objects.get()
1386
        self.assertEqual(user.moderated, False)
1387
        self.assertEqual(user.moderated_at, None)
1388
        self.assertEqual(user.email_verified, True)
1389
        self.assertTrue(user.activation_sent)
1390

    
1391
        # test code reuse
1392
        result = backend.handle_verification(user, valid_code)
1393
        self.assertEqual(result.status, backend.Result.ERROR)
1394
        user = AstakosUser.objects.get()
1395
        self.assertEqual(user.is_active, False)
1396
        self.assertEqual(user.moderated, False)
1397
        self.assertEqual(user.moderated_at, None)
1398
        self.assertEqual(user.email_verified, True)
1399
        self.assertTrue(user.activation_sent)
1400

    
1401
        # valid code on verified user
1402
        user = AstakosUser.objects.get()
1403
        valid_code = user.verification_code
1404
        result = backend.handle_verification(user, valid_code)
1405
        self.assertEqual(result.status, backend.Result.ERROR)
1406

    
1407
        # step three, moderation user
1408
        user = AstakosUser.objects.get()
1409
        result = backend.handle_moderation(user)
1410
        backend.send_result_notifications(result, user)
1411

    
1412
        user = AstakosUser.objects.get()
1413
        self.assertEqual(user.is_active, True)
1414
        self.assertEqual(user.moderated, True)
1415
        self.assertTrue(user.moderated_at)
1416
        self.assertEqual(user.email_verified, True)
1417
        self.assertTrue(user.activation_sent)
1418

    
1419

    
1420
class TestProjects(TestCase):
1421
    """
1422
    Test projects.
1423
    """
1424
    def setUp(self):
1425
        # astakos resources
1426
        self.astakos_service = Service.objects.create(name="astakos",
1427
                                                      api_url="/astakos/api/")
1428
        self.resource = Resource.objects.create(name="astakos.pending_app",
1429
                                                uplimit=0,
1430
                                                allow_in_projects=False,
1431
                                                service=self.astakos_service)
1432

    
1433
        # custom service resources
1434
        self.service = Service.objects.create(name="service1",
1435
                                              api_url="http://service.api")
1436
        self.resource = Resource.objects.create(name="service1.resource",
1437
                                                uplimit=100,
1438
                                                service=self.service)
1439
        self.admin = get_local_user("projects-admin@synnefo.org")
1440
        self.admin.uuid = 'uuid1'
1441
        self.admin.save()
1442

    
1443
        self.user = get_local_user("user@synnefo.org")
1444
        self.member = get_local_user("member@synnefo.org")
1445
        self.member2 = get_local_user("member2@synnefo.org")
1446

    
1447
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1448
        self.user_client = get_user_client("user@synnefo.org")
1449
        self.member_client = get_user_client("member@synnefo.org")
1450
        self.member2_client = get_user_client("member2@synnefo.org")
1451

    
1452
        quotas.qh_sync_users(AstakosUser.objects.all())
1453

    
1454
    @im_settings(PROJECT_ADMINS=['uuid1'])
1455
    def test_application_limit(self):
1456
        # user cannot create a project
1457
        r = self.user_client.get(reverse('project_add'), follow=True)
1458
        self.assertRedirects(r, reverse('project_list'))
1459
        self.assertContains(r, "You are not allowed to create a new project")
1460

    
1461
        # but admin can
1462
        r = self.admin_client.get(reverse('project_add'), follow=True)
1463
        self.assertRedirects(r, reverse('project_add'))
1464

    
1465
    @im_settings(PROJECT_ADMINS=['uuid1'])
1466
    def test_allow_in_project(self):
1467
        dfrom = datetime.now()
1468
        dto = datetime.now() + timedelta(days=30)
1469

    
1470
        # astakos.pending_uplimit allow_in_project flag is False
1471
        # we shouldn't be able to create a project application using this
1472
        # resource.
1473
        application_data = {
1474
            'name': 'project.synnefo.org',
1475
            'homepage': 'https://www.synnefo.org',
1476
            'start_date': dfrom.strftime("%Y-%m-%d"),
1477
            'end_date': dto.strftime("%Y-%m-%d"),
1478
            'member_join_policy': 2,
1479
            'member_leave_policy': 1,
1480
            'service1.resource_uplimit': 100,
1481
            'is_selected_service1.resource': "1",
1482
            'astakos.pending_app_uplimit': 100,
1483
            'is_selected_accounts': "1",
1484
            'user': self.user.pk
1485
        }
1486
        form = forms.ProjectApplicationForm(data=application_data)
1487
        # form is invalid
1488
        self.assertEqual(form.is_valid(), False)
1489

    
1490
        del application_data['astakos.pending_app_uplimit']
1491
        del application_data['is_selected_accounts']
1492
        form = forms.ProjectApplicationForm(data=application_data)
1493
        self.assertEqual(form.is_valid(), True)
1494

    
1495
    @im_settings(PROJECT_ADMINS=['uuid1'])
1496
    def test_applications(self):
1497
        # let user have 2 pending applications
1498
        quotas.add_base_quota(self.user, 'astakos.pending_app', 2)
1499

    
1500
        r = self.user_client.get(reverse('project_add'), follow=True)
1501
        self.assertRedirects(r, reverse('project_add'))
1502

    
1503
        # user fills the project application form
1504
        post_url = reverse('project_add') + '?verify=1'
1505
        dfrom = datetime.now()
1506
        dto = datetime.now() + timedelta(days=30)
1507
        application_data = {
1508
            'name': 'project.synnefo.org',
1509
            'homepage': 'https://www.synnefo.org',
1510
            'start_date': dfrom.strftime("%Y-%m-%d"),
1511
            'end_date': dto.strftime("%Y-%m-%d"),
1512
            'member_join_policy': 2,
1513
            'member_leave_policy': 1,
1514
            'service1.resource_uplimit': 100,
1515
            'is_selected_service1.resource': "1",
1516
            'user': self.user.pk
1517
        }
1518
        r = self.user_client.post(post_url, data=application_data, follow=True)
1519
        self.assertEqual(r.status_code, 200)
1520
        self.assertEqual(r.context['form'].is_valid(), True)
1521

    
1522
        # confirm request
1523
        post_url = reverse('project_add') + '?verify=0&edit=0'
1524
        r = self.user_client.post(post_url, data=application_data, follow=True)
1525
        self.assertContains(r, "The project application has been received")
1526
        self.assertRedirects(r, reverse('project_list'))
1527
        self.assertEqual(ProjectApplication.objects.count(), 1)
1528
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1529

    
1530
        # create another one
1531
        application_data['name'] = 'project2.synnefo.org'
1532
        r = self.user_client.post(post_url, data=application_data, follow=True)
1533
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1534

    
1535
        # no more applications (LIMIT is 2)
1536
        r = self.user_client.get(reverse('project_add'), follow=True)
1537
        self.assertRedirects(r, reverse('project_list'))
1538
        self.assertContains(r, "You are not allowed to create a new project")
1539

    
1540
        # login
1541
        self.admin_client.get(reverse("edit_profile"))
1542
        # admin approves
1543
        r = self.admin_client.post(reverse('project_app_approve',
1544
                                           kwargs={'application_id': app1_id}),
1545
                                   follow=True)
1546
        self.assertEqual(r.status_code, 200)
1547

    
1548
        # project created
1549
        self.assertEqual(Project.objects.count(), 1)
1550

    
1551
        # login
1552
        self.member_client.get(reverse("edit_profile"))
1553
        # cannot join app2 (not approved yet)
1554
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1555
        r = self.member_client.post(join_url, follow=True)
1556
        self.assertEqual(r.status_code, 403)
1557

    
1558
        # can join app1
1559
        self.member_client.get(reverse("edit_profile"))
1560
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1561
        r = self.member_client.post(join_url, follow=True)
1562
        self.assertEqual(r.status_code, 200)
1563

    
1564
        memberships = ProjectMembership.objects.all()
1565
        self.assertEqual(len(memberships), 1)
1566
        memb_id = memberships[0].id
1567

    
1568
        reject_member_url = reverse('project_reject_member',
1569
                                    kwargs={'chain_id': app1_id, 'memb_id':
1570
                                            memb_id})
1571
        accept_member_url = reverse('project_accept_member',
1572
                                    kwargs={'chain_id': app1_id, 'memb_id':
1573
                                            memb_id})
1574

    
1575
        # only project owner is allowed to reject
1576
        r = self.member_client.post(reject_member_url, follow=True)
1577
        self.assertContains(r, "You do not have the permissions")
1578
        self.assertEqual(r.status_code, 200)
1579

    
1580
        # user (owns project) rejects membership
1581
        r = self.user_client.post(reject_member_url, follow=True)
1582
        self.assertEqual(ProjectMembership.objects.count(), 0)
1583

    
1584
        # user rejoins
1585
        self.member_client.get(reverse("edit_profile"))
1586
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1587
        r = self.member_client.post(join_url, follow=True)
1588
        self.assertEqual(r.status_code, 200)
1589
        self.assertEqual(ProjectMembership.objects.count(), 1)
1590

    
1591
        # user (owns project) accepts membership
1592
        r = self.user_client.post(accept_member_url, follow=True)
1593
        self.assertEqual(ProjectMembership.objects.count(), 1)
1594
        membership = ProjectMembership.objects.get()
1595
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1596

    
1597
        user_quotas = quotas.get_users_quotas([self.member])
1598
        resource = 'service1.resource'
1599
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1600
        # 100 from initial uplimit + 100 from project
1601
        self.assertEqual(newlimit, 200)
1602

    
1603
        remove_member_url = reverse('project_remove_member',
1604
                                    kwargs={'chain_id': app1_id, 'memb_id':
1605
                                            membership.id})
1606
        r = self.user_client.post(remove_member_url, follow=True)
1607
        self.assertEqual(r.status_code, 200)
1608

    
1609
        user_quotas = quotas.get_users_quotas([self.member])
1610
        resource = 'service1.resource'
1611
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1612
        # 200 - 100 from project
1613
        self.assertEqual(newlimit, 100)
1614

    
1615

    
1616
ROOT = '/astakos/api/'
1617
u = lambda url: ROOT + url
1618

    
1619

    
1620
class QuotaAPITest(TestCase):
1621
    def test_0(self):
1622
        client = Client()
1623
        # custom service resources
1624
        service1 = Service.objects.create(
1625
            name="service1", api_url="http://service1.api")
1626
        resource11 = {"name": "service1.resource11",
1627
                      "desc": "resource11 desc",
1628
                      "allow_in_projects": True}
1629
        r, _ = resources.add_resource(service1, resource11)
1630
        resources.update_resource(r, 100)
1631
        resource12 = {"name": "service1.resource12",
1632
                      "desc": "resource11 desc",
1633
                      "unit": "bytes"}
1634
        r, _ = resources.add_resource(service1, resource12)
1635
        resources.update_resource(r, 1024)
1636

    
1637
        # create user
1638
        user = get_local_user('test@grnet.gr')
1639
        quotas.qh_sync_user(user)
1640

    
1641
        # create another service
1642
        service2 = Service.objects.create(
1643
            name="service2", api_url="http://service2.api")
1644
        resource21 = {"name": "service2.resource21",
1645
                      "desc": "resource11 desc",
1646
                      "allow_in_projects": False}
1647
        r, _ = resources.add_resource(service2, resource21)
1648
        resources.update_resource(r, 3)
1649

    
1650
        resource_names = [r['name'] for r in
1651
                          [resource11, resource12, resource21]]
1652

    
1653
        # get resources
1654
        r = client.get(u('resources'))
1655
        self.assertEqual(r.status_code, 200)
1656
        body = json.loads(r.content)
1657
        for name in resource_names:
1658
            assertIn(name, body)
1659

    
1660
        # get quota
1661
        r = client.get(u('quotas'))
1662
        self.assertEqual(r.status_code, 401)
1663

    
1664
        headers = {'HTTP_X_AUTH_TOKEN': user.auth_token}
1665
        r = client.get(u('quotas/'), **headers)
1666
        self.assertEqual(r.status_code, 200)
1667
        body = json.loads(r.content)
1668
        system_quota = body['system']
1669
        assertIn('system', body)
1670
        for name in resource_names:
1671
            assertIn(name, system_quota)
1672

    
1673
        r = client.get(u('service_quotas'))
1674
        self.assertEqual(r.status_code, 401)
1675

    
1676
        s1_headers = {'HTTP_X_AUTH_TOKEN': service1.auth_token}
1677
        r = client.get(u('service_quotas'), **s1_headers)
1678
        self.assertEqual(r.status_code, 200)
1679
        body = json.loads(r.content)
1680
        assertIn(user.uuid, body)
1681

    
1682
        r = client.get(u('commissions'), **s1_headers)
1683
        self.assertEqual(r.status_code, 200)
1684
        body = json.loads(r.content)
1685
        self.assertEqual(body, [])
1686

    
1687
        # issue some commissions
1688
        commission_request = {
1689
            "force": False,
1690
            "auto_accept": False,
1691
            "name": "my commission",
1692
            "provisions": [
1693
                {
1694
                    "holder": user.uuid,
1695
                    "source": "system",
1696
                    "resource": resource11['name'],
1697
                    "quantity": 1
1698
                },
1699
                {
1700
                    "holder": user.uuid,
1701
                    "source": "system",
1702
                    "resource": resource12['name'],
1703
                    "quantity": 30000
1704
                }]}
1705

    
1706
        post_data = json.dumps(commission_request)
1707
        r = client.post(u('commissions'), post_data,
1708
                        content_type='application/json', **s1_headers)
1709
        self.assertEqual(r.status_code, 413)
1710

    
1711
        commission_request = {
1712
            "force": False,
1713
            "auto_accept": False,
1714
            "name": "my commission",
1715
            "provisions": [
1716
                {
1717
                    "holder": user.uuid,
1718
                    "source": "system",
1719
                    "resource": resource11['name'],
1720
                    "quantity": 1
1721
                },
1722
                {
1723
                    "holder": user.uuid,
1724
                    "source": "system",
1725
                    "resource": resource12['name'],
1726
                    "quantity": 100
1727
                }]}
1728

    
1729
        post_data = json.dumps(commission_request)
1730
        r = client.post(u('commissions'), post_data,
1731
                        content_type='application/json', **s1_headers)
1732
        self.assertEqual(r.status_code, 201)
1733
        body = json.loads(r.content)
1734
        serial = body['serial']
1735
        self.assertEqual(serial, 1)
1736

    
1737
        post_data = json.dumps(commission_request)
1738
        r = client.post(u('commissions'), post_data,
1739
                        content_type='application/json', **s1_headers)
1740
        self.assertEqual(r.status_code, 201)
1741
        body = json.loads(r.content)
1742
        self.assertEqual(body['serial'], 2)
1743

    
1744
        post_data = json.dumps(commission_request)
1745
        r = client.post(u('commissions'), post_data,
1746
                        content_type='application/json', **s1_headers)
1747
        self.assertEqual(r.status_code, 201)
1748
        body = json.loads(r.content)
1749
        self.assertEqual(body['serial'], 3)
1750

    
1751
        r = client.get(u('commissions'), **s1_headers)
1752
        self.assertEqual(r.status_code, 200)
1753
        body = json.loads(r.content)
1754
        self.assertEqual(body, [1, 2, 3])
1755

    
1756
        r = client.get(u('commissions/' + str(serial)), **s1_headers)
1757
        self.assertEqual(r.status_code, 200)
1758
        body = json.loads(r.content)
1759
        self.assertEqual(body['serial'], serial)
1760
        assertIn('issue_time', body)
1761
        self.assertEqual(body['provisions'], commission_request['provisions'])
1762
        self.assertEqual(body['name'], commission_request['name'])
1763

    
1764
        r = client.get(u('service_quotas?user=' + user.uuid), **s1_headers)
1765
        self.assertEqual(r.status_code, 200)
1766
        body = json.loads(r.content)
1767
        user_quota = body[user.uuid]
1768
        system_quota = user_quota['system']
1769
        r11 = system_quota[resource11['name']]
1770
        self.assertEqual(r11['usage'], 3)
1771
        self.assertEqual(r11['pending'], 3)
1772

    
1773
        # resolve pending commissions
1774
        resolve_data = {
1775
            "accept": [1, 3],
1776
            "reject": [2, 3, 4],
1777
        }
1778
        post_data = json.dumps(resolve_data)
1779

    
1780
        r = client.post(u('commissions/action'), post_data,
1781
                        content_type='application/json', **s1_headers)
1782
        self.assertEqual(r.status_code, 200)
1783
        body = json.loads(r.content)
1784
        self.assertEqual(body['accepted'], [1])
1785
        self.assertEqual(body['rejected'], [2])
1786
        failed = body['failed']
1787
        self.assertEqual(len(failed), 2)
1788

    
1789
        r = client.get(u('commissions/' + str(serial)), **s1_headers)
1790
        self.assertEqual(r.status_code, 404)
1791

    
1792
        # auto accept
1793
        commission_request = {
1794
            "auto_accept": True,
1795
            "name": "my commission",
1796
            "provisions": [
1797
                {
1798
                    "holder": user.uuid,
1799
                    "source": "system",
1800
                    "resource": resource11['name'],
1801
                    "quantity": 1
1802
                },
1803
                {
1804
                    "holder": user.uuid,
1805
                    "source": "system",
1806
                    "resource": resource12['name'],
1807
                    "quantity": 100
1808
                }]}
1809

    
1810
        post_data = json.dumps(commission_request)
1811
        r = client.post(u('commissions'), post_data,
1812
                        content_type='application/json', **s1_headers)
1813
        self.assertEqual(r.status_code, 201)
1814
        body = json.loads(r.content)
1815
        serial = body['serial']
1816
        self.assertEqual(serial, 4)
1817

    
1818
        r = client.get(u('commissions/' + str(serial)), **s1_headers)
1819
        self.assertEqual(r.status_code, 404)
1820

    
1821
        # malformed
1822
        commission_request = {
1823
            "auto_accept": True,
1824
            "name": "my commission",
1825
            "provisions": [
1826
                {
1827
                    "holder": user.uuid,
1828
                    "source": "system",
1829
                    "resource": resource11['name'],
1830
                }
1831
            ]}
1832

    
1833
        post_data = json.dumps(commission_request)
1834
        r = client.post(u('commissions'), post_data,
1835
                        content_type='application/json', **s1_headers)
1836
        self.assertEqual(r.status_code, 400)
1837

    
1838
        commission_request = {
1839
            "auto_accept": True,
1840
            "name": "my commission",
1841
            "provisions": "dummy"}
1842

    
1843
        post_data = json.dumps(commission_request)
1844
        r = client.post(u('commissions'), post_data,
1845
                        content_type='application/json', **s1_headers)
1846
        self.assertEqual(r.status_code, 400)
1847

    
1848
        r = client.post(u('commissions'), commission_request,
1849
                        content_type='application/json', **s1_headers)
1850
        self.assertEqual(r.status_code, 400)
1851

    
1852
        # no holding
1853
        commission_request = {
1854
            "auto_accept": True,
1855
            "name": "my commission",
1856
            "provisions": [
1857
                {
1858
                    "holder": user.uuid,
1859
                    "source": "system",
1860
                    "resource": "non existent",
1861
                    "quantity": 1
1862
                },
1863
                {
1864
                    "holder": user.uuid,
1865
                    "source": "system",
1866
                    "resource": resource12['name'],
1867
                    "quantity": 100
1868
                }]}
1869

    
1870
        post_data = json.dumps(commission_request)
1871
        r = client.post(u('commissions'), post_data,
1872
                        content_type='application/json', **s1_headers)
1873
        self.assertEqual(r.status_code, 404)
1874

    
1875
        # release
1876
        commission_request = {
1877
            "provisions": [
1878
                {
1879
                    "holder": user.uuid,
1880
                    "source": "system",
1881
                    "resource": resource11['name'],
1882
                    "quantity": -1
1883
                }
1884
            ]}
1885

    
1886
        post_data = json.dumps(commission_request)
1887
        r = client.post(u('commissions'), post_data,
1888
                        content_type='application/json', **s1_headers)
1889
        self.assertEqual(r.status_code, 201)
1890
        body = json.loads(r.content)
1891
        serial = body['serial']
1892

    
1893
        accept_data = {'accept': ""}
1894
        post_data = json.dumps(accept_data)
1895
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1896
                        content_type='application/json', **s1_headers)
1897
        self.assertEqual(r.status_code, 200)
1898

    
1899
        reject_data = {'reject': ""}
1900
        post_data = json.dumps(accept_data)
1901
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1902
                        content_type='application/json', **s1_headers)
1903
        self.assertEqual(r.status_code, 404)
1904

    
1905
        # force
1906
        commission_request = {
1907
            "force": True,
1908
            "provisions": [
1909
                {
1910
                    "holder": user.uuid,
1911
                    "source": "system",
1912
                    "resource": resource11['name'],
1913
                    "quantity": 100
1914
                }]}
1915

    
1916
        post_data = json.dumps(commission_request)
1917
        r = client.post(u('commissions'), post_data,
1918
                        content_type='application/json', **s1_headers)
1919
        self.assertEqual(r.status_code, 201)
1920

    
1921
        commission_request = {
1922
            "force": True,
1923
            "provisions": [
1924
                {
1925
                    "holder": user.uuid,
1926
                    "source": "system",
1927
                    "resource": resource11['name'],
1928
                    "quantity": -200
1929
                }]}
1930

    
1931
        post_data = json.dumps(commission_request)
1932
        r = client.post(u('commissions'), post_data,
1933
                        content_type='application/json', **s1_headers)
1934
        self.assertEqual(r.status_code, 413)
1935

    
1936
        r = client.get(u('quotas'), **headers)
1937
        self.assertEqual(r.status_code, 200)
1938
        body = json.loads(r.content)
1939
        system_quota = body['system']
1940
        r11 = system_quota[resource11['name']]
1941
        self.assertEqual(r11['usage'], 102)
1942
        self.assertEqual(r11['pending'], 101)