Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ e7cb4085

History | View | Annotate | Download (80.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings
40

    
41
from django.test import Client
42
from django.test import TransactionTestCase as TestCase
43
from django.core import mail
44
from django.http import SimpleCookie, HttpRequest, QueryDict
45
from django.utils.importlib import import_module
46
from django.utils import simplejson as json
47

    
48
from astakos.im.activation_backends import *
49
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
50
from astakos.im.models import *
51
from astakos.im import functions
52
from astakos.im import settings as astakos_settings
53
from astakos.im import forms
54
from astakos.im import activation_backends
55

    
56
from urllib import quote
57
from datetime import timedelta
58

    
59
from astakos.im import messages
60
from astakos.im import auth_providers
61
from astakos.im import quotas
62
from astakos.im import resources
63

    
64
from django.conf import settings
65

    
66

    
67
# set some common settings
68
astakos_settings.EMAILCHANGE_ENABLED = True
69
astakos_settings.RECAPTCHA_ENABLED = False
70

    
71
settings.LOGGING_SETUP['disable_existing_loggers'] = False
72

    
73
# shortcut decorators to override provider settings
74
# e.g. shibboleth_settings(ENABLED=True) will set
75
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
76
prefixes = {'providers': 'AUTH_PROVIDER_',
77
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
78
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
79
im_settings = functools.partial(with_settings, astakos_settings)
80
shibboleth_settings = functools.partial(with_settings,
81
                                        settings,
82
                                        prefix=prefixes['shibboleth'])
83
localauth_settings = functools.partial(with_settings, settings,
84
                                       prefix=prefixes['local'])
85

    
86

    
87
class AstakosTestClient(Client):
88
    pass
89

    
90

    
91
class ShibbolethClient(AstakosTestClient):
92
    """
93
    A shibboleth agnostic client.
94
    """
95
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
96
                          dir(ShibbolethTokens))
97

    
98
    def __init__(self, *args, **kwargs):
99
        self.tokens = kwargs.pop('tokens', {})
100
        super(ShibbolethClient, self).__init__(*args, **kwargs)
101

    
102
    def set_tokens(self, **kwargs):
103
        for key, value in kwargs.iteritems():
104
            key = 'SHIB_%s' % key.upper()
105
            if not key in self.VALID_TOKENS:
106
                raise Exception('Invalid shibboleth token')
107

    
108
            self.tokens[key] = value
109

    
110
    def unset_tokens(self, *keys):
111
        for key in keys:
112
            key = 'SHIB_%s' % param.upper()
113
            if key in self.tokens:
114
                del self.tokens[key]
115

    
116
    def reset_tokens(self):
117
        self.tokens = {}
118

    
119
    def get_http_token(self, key):
120
        http_header = getattr(ShibbolethTokens, key)
121
        return http_header
122

    
123
    def request(self, **request):
124
        """
125
        Transform valid shibboleth tokens to http headers
126
        """
127
        for token, value in self.tokens.iteritems():
128
            request[self.get_http_token(token)] = value
129

    
130
        for param in request.keys():
131
            key = 'SHIB_%s' % param.upper()
132
            if key in self.VALID_TOKENS:
133
                request[self.get_http_token(key)] = request[param]
134
                del request[param]
135

    
136
        return super(ShibbolethClient, self).request(**request)
137

    
138

    
139
def get_user_client(username, password="password"):
140
    client = Client()
141
    client.login(username=username, password=password)
142
    return client
143

    
144

    
145
def get_local_user(username, **kwargs):
146
        try:
147
            return AstakosUser.objects.get(email=username)
148
        except:
149
            user_params = {
150
                'username': username,
151
                'email': username,
152
                'is_active': True,
153
                'activation_sent': datetime.now(),
154
                'email_verified': True
155
            }
156
            user_params.update(kwargs)
157
            user = AstakosUser(**user_params)
158
            user.set_password(kwargs.get('password', 'password'))
159
            user.renew_verification_code()
160
            user.save()
161
            user.add_auth_provider('local', auth_backend='astakos')
162
            if kwargs.get('is_active', True):
163
                user.is_active = True
164
            else:
165
                user.is_active = False
166
            user.save()
167
            return user
168

    
169

    
170
def get_mailbox(email):
171
    mails = []
172
    for sent_email in mail.outbox:
173
        for recipient in sent_email.recipients():
174
            if email in recipient:
175
                mails.append(sent_email)
176
    return mails
177

    
178

    
179
class ShibbolethTests(TestCase):
180
    """
181
    Testing shibboleth authentication.
182
    """
183

    
184
    fixtures = ['groups']
185

    
186
    def setUp(self):
187
        self.client = ShibbolethClient()
188
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
189
        astakos_settings.MODERATION_ENABLED = True
190

    
191
    @im_settings(FORCE_PROFILE_UPDATE=False)
192
    def test_create_account(self):
193

    
194
        client = ShibbolethClient()
195

    
196
        # shibboleth views validation
197
        # eepn required
198
        r = client.get('/im/login/shibboleth?', follow=True)
199
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
200
            'domain': astakos_settings.BASEURL,
201
            'contact_email': settings.CONTACT_EMAIL
202
        })
203
        client.set_tokens(eppn="kpapeppn")
204

    
205
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
206
        # shibboleth user info required
207
        r = client.get('/im/login/shibboleth?', follow=True)
208
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
209
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
210

    
211
        # shibboleth logged us in
212
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
213
                          cn="Kostas Papadimitriou",
214
                          ep_affiliation="Test Affiliation")
215
        r = client.get('/im/login/shibboleth?', follow=True)
216
        token = PendingThirdPartyUser.objects.get().token
217
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
218
        self.assertEqual(r.status_code, 200)
219

    
220
        # a new pending user created
221
        pending_user = PendingThirdPartyUser.objects.get(
222
            third_party_identifier="kpapeppn")
223
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
224
        # keep the token for future use
225
        token = pending_user.token
226
        # from now on no shibboleth headers are sent to the server
227
        client.reset_tokens()
228

    
229
        # this is the old way, it should fail, to avoid pending user take over
230
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
231
        self.assertEqual(r.status_code, 404)
232

    
233
        # this is the signup unique url associated with the pending user
234
        # created
235
        r = client.get('/im/signup/?third_party_token=%s' % token)
236
        identifier = pending_user.third_party_identifier
237
        post_data = {'third_party_identifier': identifier,
238
                     'first_name': 'Kostas',
239
                     'third_party_token': token,
240
                     'last_name': 'Mitroglou',
241
                     'provider': 'shibboleth'}
242

    
243
        signup_url = reverse('signup')
244

    
245
        # invlid email
246
        post_data['email'] = 'kpap'
247
        r = client.post(signup_url, post_data)
248
        self.assertContains(r, token)
249

    
250
        # existing email
251
        existing_user = get_local_user('test@test.com')
252
        post_data['email'] = 'test@test.com'
253
        r = client.post(signup_url, post_data)
254
        self.assertContains(r, messages.EMAIL_USED)
255
        existing_user.delete()
256

    
257
        # and finally a valid signup
258
        post_data['email'] = 'kpap@synnefo.org'
259
        r = client.post(signup_url, post_data, follow=True)
260
        self.assertContains(r, messages.VERIFICATION_SENT)
261

    
262
        # entires commited as expected
263
        self.assertEqual(AstakosUser.objects.count(), 1)
264
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
265
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
266

    
267
        # provider info stored
268
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
269
        self.assertEqual(provider.affiliation, 'Test Affiliation')
270
        self.assertEqual(provider.info, {u'email': u'kpap@synnefo.org',
271
                                         u'eppn': u'kpapeppn',
272
                                         u'name': u'Kostas Papadimitriou'})
273

    
274
        # login (not activated yet)
275
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppn",
276
                          cn="Kostas Papadimitriou", )
277
        r = client.get("/im/login/shibboleth?", follow=True)
278
        self.assertContains(r, 'is pending moderation')
279

    
280
        # admin activates the user
281
        u = AstakosUser.objects.get(username="kpap@synnefo.org")
282
        backend = activation_backends.get_backend()
283
        activation_result = backend.verify_user(u, u.verification_code)
284
        activation_result = backend.accept_user(u)
285
        self.assertFalse(activation_result.is_error())
286
        backend.send_result_notifications(activation_result, u)
287
        self.assertEqual(u.is_active, True)
288

    
289
        # we see our profile
290
        r = client.get("/im/login/shibboleth?", follow=True)
291
        self.assertRedirects(r, '/im/landing')
292
        self.assertEqual(r.status_code, 200)
293

    
294
    def test_existing(self):
295
        """
296
        Test adding of third party login to an existing account
297
        """
298

    
299
        # this is our existing user
300
        existing_user = get_local_user('kpap@synnefo.org')
301
        existing_inactive = get_local_user('kpap-inactive@synnefo.org')
302
        existing_inactive.is_active = False
303
        existing_inactive.save()
304

    
305
        existing_unverified = get_local_user('kpap-unverified@synnefo.org')
306
        existing_unverified.is_active = False
307
        existing_unverified.activation_sent = None
308
        existing_unverified.email_verified = False
309
        existing_unverified.is_verified = False
310
        existing_unverified.save()
311

    
312
        client = ShibbolethClient()
313
        # shibboleth logged us in, notice that we use different email
314
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
315
                          cn="Kostas Papadimitriou", )
316
        r = client.get("/im/login/shibboleth?", follow=True)
317

    
318
        # a new pending user created
319
        pending_user = PendingThirdPartyUser.objects.get()
320
        token = pending_user.token
321
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
322
        pending_key = pending_user.token
323
        client.reset_tokens()
324
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
325

    
326
        form = r.context['form']
327
        signupdata = copy.copy(form.initial)
328
        signupdata['email'] = 'kpap@synnefo.org'
329
        signupdata['third_party_token'] = token
330
        signupdata['provider'] = 'shibboleth'
331
        signupdata.pop('id', None)
332

    
333
        # the email exists to another user
334
        r = client.post("/im/signup", signupdata)
335
        self.assertContains(r, "There is already an account with this email "
336
                               "address")
337
        # change the case, still cannot create
338
        signupdata['email'] = 'KPAP@synnefo.org'
339
        r = client.post("/im/signup", signupdata)
340
        self.assertContains(r, "There is already an account with this email "
341
                               "address")
342
        # inactive user
343
        signupdata['email'] = 'KPAP-inactive@synnefo.org'
344
        r = client.post("/im/signup", signupdata)
345
        self.assertContains(r, "There is already an account with this email "
346
                               "address")
347

    
348
        # unverified user, this should pass, old entry will be deleted
349
        signupdata['email'] = 'KAPAP-unverified@synnefo.org'
350
        r = client.post("/im/signup", signupdata)
351

    
352
        post_data = {'password': 'password',
353
                     'username': 'kpap@synnefo.org'}
354
        r = client.post('/im/local', post_data, follow=True)
355
        self.assertTrue(r.context['request'].user.is_authenticated())
356
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
357
                          cn="Kostas Papadimitriou", )
358
        r = client.get("/im/login/shibboleth?", follow=True)
359
        self.assertContains(r, "enabled for this account")
360
        client.reset_tokens()
361

    
362
        user = existing_user
363
        self.assertTrue(user.has_auth_provider('shibboleth'))
364
        self.assertTrue(user.has_auth_provider('local',
365
                                               auth_backend='astakos'))
366
        client.logout()
367

    
368
        # look Ma, i can login with both my shibboleth and local account
369
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
370
                          cn="Kostas Papadimitriou")
371
        r = client.get("/im/login/shibboleth?", follow=True)
372
        self.assertTrue(r.context['request'].user.is_authenticated())
373
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
374
        self.assertRedirects(r, '/im/landing')
375
        self.assertEqual(r.status_code, 200)
376
        client.logout()
377
        client.reset_tokens()
378

    
379
        # logged out
380
        r = client.get("/im/profile", follow=True)
381
        self.assertFalse(r.context['request'].user.is_authenticated())
382

    
383
        # login with local account also works
384
        post_data = {'password': 'password',
385
                     'username': 'kpap@synnefo.org'}
386
        r = self.client.post('/im/local', post_data, follow=True)
387
        self.assertTrue(r.context['request'].user.is_authenticated())
388
        self.assertTrue(r.context['request'].user.email == "kpap@synnefo.org")
389
        self.assertRedirects(r, '/im/landing')
390
        self.assertEqual(r.status_code, 200)
391

    
392
        # cannot add the same eppn
393
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
394
                          cn="Kostas Papadimitriou", )
395
        r = client.get("/im/login/shibboleth?", follow=True)
396
        self.assertRedirects(r, '/im/landing')
397
        self.assertTrue(r.status_code, 200)
398
        self.assertEquals(existing_user.auth_providers.count(), 2)
399

    
400
        # only one allowed by default
401
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
402
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
403
        prov = auth_providers.get_provider('shibboleth')
404
        r = client.get("/im/login/shibboleth?", follow=True)
405
        self.assertContains(r, "Failed to add")
406
        self.assertRedirects(r, '/im/profile')
407
        self.assertTrue(r.status_code, 200)
408
        self.assertEquals(existing_user.auth_providers.count(), 2)
409
        client.logout()
410
        client.reset_tokens()
411

    
412
        # cannot login with another eppn
413
        client.set_tokens(mail="kpap@synnefo.org", eppn="kpapeppninvalid",
414
                          cn="Kostas Papadimitriou")
415
        r = client.get("/im/login/shibboleth?", follow=True)
416
        self.assertFalse(r.context['request'].user.is_authenticated())
417

    
418
        # cannot
419

    
420
        # lets remove local password
421
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
422
                                       email="kpap@synnefo.org")
423
        remove_local_url = user.get_auth_provider('local').get_remove_url
424
        remove_shibbo_url = user.get_auth_provider('shibboleth',
425
                                                   'kpapeppn').get_remove_url
426
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
427
                          cn="Kostas Papadimtriou")
428
        r = client.get("/im/login/shibboleth?", follow=True)
429
        client.reset_tokens()
430

    
431
        # TODO: this view should use POST
432
        r = client.get(remove_local_url)
433
        # 2 providers left
434
        self.assertEqual(user.auth_providers.count(), 1)
435
        # cannot remove last provider
436
        r = client.get(remove_shibbo_url)
437
        self.assertEqual(r.status_code, 403)
438
        self.client.logout()
439

    
440
        # cannot login using local credentials (notice we use another client)
441
        post_data = {'password': 'password',
442
                     'username': 'kpap@synnefo.org'}
443
        r = self.client.post('/im/local', post_data, follow=True)
444
        self.assertFalse(r.context['request'].user.is_authenticated())
445

    
446
        # we can reenable the local provider by setting a password
447
        r = client.get("/im/password_change", follow=True)
448
        r = client.post("/im/password_change", {'new_password1': '111',
449
                                                'new_password2': '111'},
450
                        follow=True)
451
        user = r.context['request'].user
452
        self.assertTrue(user.has_auth_provider('local'))
453
        self.assertTrue(user.has_auth_provider('shibboleth'))
454
        self.assertTrue(user.check_password('111'))
455
        self.assertTrue(user.has_usable_password())
456
        self.client.logout()
457

    
458
        # now we can login
459
        post_data = {'password': '111',
460
                     'username': 'kpap@synnefo.org'}
461
        r = self.client.post('/im/local', post_data, follow=True)
462
        self.assertTrue(r.context['request'].user.is_authenticated())
463

    
464
        client.reset_tokens()
465

    
466
        # we cannot take over another shibboleth identifier
467
        user2 = get_local_user('another@synnefo.org')
468
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
469
        # login
470
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
471
                          cn="Kostas Papadimitriou")
472
        r = client.get("/im/login/shibboleth?", follow=True)
473
        # try to assign existing shibboleth identifier of another user
474
        client.set_tokens(mail="kpap_second@shibboleth.gr",
475
                          eppn="existingeppn", cn="Kostas Papadimitriou")
476
        r = client.get("/im/login/shibboleth?", follow=True)
477
        self.assertContains(r, "this account is already assigned")
478

    
479

    
480
class TestLocal(TestCase):
481

    
482
    fixtures = ['groups']
483

    
484
    def setUp(self):
485
        settings.ADMINS = (('admin', 'support@cloud.synnefo.org'),)
486
        settings.SERVER_EMAIL = 'no-reply@synnefo.org'
487
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
488
        settings.ASTAKOS_MODERATION_ENABLED = True
489

    
490
    def tearDown(self):
491
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
492

    
493
    def test_no_moderation(self):
494
        # disable moderation
495
        astakos_settings.MODERATION_ENABLED = False
496

    
497
        # create a new user
498
        r = self.client.get("/im/signup")
499
        self.assertEqual(r.status_code, 200)
500
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
501
                'password2': 'password', 'first_name': 'Kostas',
502
                'last_name': 'Mitroglou', 'provider': 'local'}
503
        r = self.client.post("/im/signup", data)
504

    
505
        # user created
506
        self.assertEqual(AstakosUser.objects.count(), 1)
507
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
508
                                       email="kpap@synnefo.org")
509
        self.assertEqual(user.username, 'kpap@synnefo.org')
510
        self.assertEqual(user.has_auth_provider('local'), True)
511
        self.assertFalse(user.is_active)
512

    
513
        # user (but not admin) gets notified
514
        self.assertEqual(len(get_mailbox('support@cloud.synnefo.org')), 0)
515
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
516
        astakos_settings.MODERATION_ENABLED = True
517

    
518
    def test_email_case(self):
519
        data = {
520
            'email': 'kPap@synnefo.org',
521
            'password1': '1234',
522
            'password2': '1234'
523
        }
524

    
525
        form = forms.LocalUserCreationForm(data)
526
        self.assertTrue(form.is_valid())
527
        user = form.save()
528
        form.store_user(user, {})
529

    
530
        u = AstakosUser.objects.get()
531
        self.assertEqual(u.email, 'kPap@synnefo.org')
532
        self.assertEqual(u.username, 'kpap@synnefo.org')
533
        u.is_active = True
534
        u.email_verified = True
535
        u.save()
536

    
537
        data = {'username': 'kpap@synnefo.org', 'password': '1234'}
538
        login = forms.LoginForm(data=data)
539
        self.assertTrue(login.is_valid())
540

    
541
        data = {'username': 'KpaP@synnefo.org', 'password': '1234'}
542
        login = forms.LoginForm(data=data)
543
        self.assertTrue(login.is_valid())
544

    
545
        data = {
546
            'email': 'kpap@synnefo.org',
547
            'password1': '1234',
548
            'password2': '1234'
549
        }
550
        form = forms.LocalUserCreationForm(data)
551
        self.assertFalse(form.is_valid())
552

    
553
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
554
                 FORCE_PROFILE_UPDATE=False, MODERATION_ENABLED=True)
555
    def test_local_provider(self):
556
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
557

    
558
        # create a user
559
        r = self.client.get("/im/signup")
560
        self.assertEqual(r.status_code, 200)
561
        data = {'email': 'kpap@synnefo.org', 'password1': 'password',
562
                'password2': 'password', 'first_name': 'Kostas',
563
                'last_name': 'Mitroglou', 'provider': 'local'}
564
        r = self.client.post("/im/signup", data)
565

    
566
        # user created
567
        self.assertEqual(AstakosUser.objects.count(), 1)
568
        user = AstakosUser.objects.get(username="kpap@synnefo.org",
569
                                       email="kpap@synnefo.org")
570
        self.assertEqual(user.username, 'kpap@synnefo.org')
571
        self.assertEqual(user.has_auth_provider('local'), True)
572
        self.assertFalse(user.is_active)  # not activated
573
        self.assertFalse(user.email_verified)  # not verified
574
        self.assertTrue(user.activation_sent)  # activation automatically sent
575
        self.assertFalse(user.moderated)
576
        self.assertFalse(user.email_verified)
577

    
578
        # admin gets notified and activates the user from the command line
579
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
580
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
581
                                           'password': 'password'},
582
                             follow=True)
583
        self.assertContains(r, messages.VERIFICATION_SENT)
584
        backend = activation_backends.get_backend()
585

    
586
        user = AstakosUser.objects.get(username="kpap@synnefo.org")
587
        backend.send_user_verification_email(user)
588

    
589
        # user activation fields updated and user gets notified via email
590
        user = AstakosUser.objects.get(pk=user.pk)
591
        self.assertTrue(user.activation_sent)
592
        self.assertFalse(user.email_verified)
593
        self.assertFalse(user.is_active)
594
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 2)
595

    
596
        # user forgot she got registered and tries to submit registration
597
        # form. Notice the upper case in email
598
        data = {'email': 'KPAP@synnefo.org', 'password1': 'password',
599
                'password2': 'password', 'first_name': 'Kostas',
600
                'last_name': 'Mitroglou', 'provider': 'local'}
601
        r = self.client.post("/im/signup", data, follow=True)
602
        self.assertRedirects(r, reverse('index'))
603
        self.assertContains(r, messages.VERIFICATION_SENT)
604

    
605
        user = AstakosUser.objects.get()
606
        # previous user replaced
607
        self.assertTrue(user.activation_sent)
608
        self.assertFalse(user.email_verified)
609
        self.assertFalse(user.is_active)
610
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 1)
611

    
612
        # hmmm, email exists; lets request a password change
613
        r = self.client.get('/im/local/password_reset')
614
        self.assertEqual(r.status_code, 200)
615
        data = {'email': 'kpap@synnefo.org'}
616
        r = self.client.post('/im/local/password_reset', data, follow=True)
617
        # she can't because account is not active yet
618
        self.assertContains(r, 'pending activation')
619

    
620
        # moderation is enabled and an activation email has already been sent
621
        # so user can trigger resend of the activation email
622
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
623
        self.assertContains(r, 'has been sent to your email address.')
624
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 2)
625

    
626
        # also she cannot login
627
        data = {'username': 'kpap@synnefo.org', 'password': 'password'}
628
        r = self.client.post('/im/local', data, follow=True)
629
        self.assertContains(r, 'Resend activation')
630
        self.assertFalse(r.context['request'].user.is_authenticated())
631
        self.assertFalse('_pithos2_a' in self.client.cookies)
632

    
633
        # user sees the message and resends activation
634
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
635
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 3)
636

    
637
        # logged in user cannot activate another account
638
        tmp_user = get_local_user("test_existing_user@synnefo.org")
639
        tmp_client = Client()
640
        tmp_client.login(username="test_existing_user@synnefo.org",
641
                         password="password")
642
        r = tmp_client.get(user.get_activation_url(), follow=True)
643
        self.assertContains(r, messages.LOGGED_IN_WARNING)
644

    
645
        r = self.client.get(user.get_activation_url(), follow=True)
646
        # previous code got invalidated
647
        self.assertEqual(r.status_code, 404)
648

    
649
        user = AstakosUser.objects.get(pk=user.pk)
650
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 0)
651
        r = self.client.get(user.get_activation_url(), follow=True)
652
        self.assertRedirects(r, reverse('index'))
653
        # user sees that account is pending approval from admins
654
        self.assertContains(r, messages.NOTIFICATION_SENT)
655
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
656

    
657
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
658
        result = backend.handle_moderation(user)
659
        backend.send_result_notifications(result, user)
660
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
661
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 2)
662

    
663
        user = AstakosUser.objects.get(email="KPAP@synnefo.org")
664
        r = self.client.get('/im/profile', follow=True)
665
        self.assertFalse(r.context['request'].user.is_authenticated())
666
        self.assertFalse('_pithos2_a' in self.client.cookies)
667
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 4)
668

    
669
        user = AstakosUser.objects.get(pk=user.pk)
670
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
671
                                           'password': 'password'},
672
                             follow=True)
673
        # user activated and logged in, token cookie set
674
        self.assertTrue(r.context['request'].user.is_authenticated())
675
        self.assertTrue('_pithos2_a' in self.client.cookies)
676
        cookies = self.client.cookies
677
        self.assertTrue(quote(user.auth_token) in
678
                        cookies.get('_pithos2_a').value)
679
        r = self.client.get('/im/logout', follow=True)
680
        r = self.client.get('/im/')
681
        # user logged out, token cookie removed
682
        self.assertFalse(r.context['request'].user.is_authenticated())
683
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
684

    
685
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
686
        del self.client.cookies['_pithos2_a']
687

    
688
        # user can login
689
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
690
                                           'password': 'password'},
691
                             follow=True)
692
        self.assertTrue(r.context['request'].user.is_authenticated())
693
        self.assertTrue('_pithos2_a' in self.client.cookies)
694
        cookies = self.client.cookies
695
        self.assertTrue(quote(user.auth_token) in
696
                        cookies.get('_pithos2_a').value)
697
        self.client.get('/im/logout', follow=True)
698

    
699
        # user forgot password
700
        old_pass = user.password
701
        r = self.client.get('/im/local/password_reset')
702
        self.assertEqual(r.status_code, 200)
703
        r = self.client.post('/im/local/password_reset', {'email':
704
                                                          'kpap@synnefo.org'})
705
        self.assertEqual(r.status_code, 302)
706
        # email sent
707
        self.assertEqual(len(get_mailbox('KPAP@synnefo.org')), 5)
708

    
709
        # user visits change password link
710
        user = AstakosUser.objects.get(pk=user.pk)
711
        r = self.client.get(user.get_password_reset_url())
712
        r = self.client.post(user.get_password_reset_url(),
713
                             {'new_password1': 'newpass',
714
                              'new_password2': 'newpass'})
715

    
716
        user = AstakosUser.objects.get(pk=user.pk)
717
        self.assertNotEqual(old_pass, user.password)
718

    
719
        # old pass is not usable
720
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
721
                                           'password': 'password'})
722
        self.assertContains(r, 'Please enter a correct username and password')
723
        r = self.client.post('/im/local', {'username': 'kpap@synnefo.org',
724
                                           'password': 'newpass'},
725
                             follow=True)
726
        self.assertTrue(r.context['request'].user.is_authenticated())
727
        self.client.logout()
728

    
729
        # tests of special local backends
730
        user = AstakosUser.objects.get(pk=user.pk)
731
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
732
        user.save()
733

    
734
        # non astakos local backends do not support password reset
735
        r = self.client.get('/im/local/password_reset')
736
        self.assertEqual(r.status_code, 200)
737
        r = self.client.post('/im/local/password_reset', {'email':
738
                                                          'kpap@synnefo.org'})
739
        # she can't because account is not active yet
740
        self.assertContains(r, "Changing password is not")
741

    
742

    
743
class UserActionsTests(TestCase):
744

    
745
    def test_email_change(self):
746
        # to test existing email validation
747
        get_local_user('existing@synnefo.org')
748

    
749
        # local user
750
        user = get_local_user('kpap@synnefo.org')
751

    
752
        # login as kpap
753
        self.client.login(username='kpap@synnefo.org', password='password')
754
        r = self.client.get('/im/profile', follow=True)
755
        user = r.context['request'].user
756
        self.assertTrue(user.is_authenticated())
757

    
758
        # change email is enabled
759
        r = self.client.get('/im/email_change')
760
        self.assertEqual(r.status_code, 200)
761
        self.assertFalse(user.email_change_is_pending())
762

    
763
        # request email change to an existing email fails
764
        data = {'new_email_address': 'existing@synnefo.org'}
765
        r = self.client.post('/im/email_change', data)
766
        self.assertContains(r, messages.EMAIL_USED)
767

    
768
        # proper email change
769
        data = {'new_email_address': 'kpap@gmail.com'}
770
        r = self.client.post('/im/email_change', data, follow=True)
771
        self.assertRedirects(r, '/im/profile')
772
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
773
        change1 = EmailChange.objects.get()
774

    
775
        # user sees a warning
776
        r = self.client.get('/im/email_change')
777
        self.assertEqual(r.status_code, 200)
778
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
779
        self.assertTrue(user.email_change_is_pending())
780

    
781
        # link was sent
782
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
783
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
784

    
785
        # proper email change
786
        data = {'new_email_address': 'kpap@yahoo.com'}
787
        r = self.client.post('/im/email_change', data, follow=True)
788
        self.assertRedirects(r, '/im/profile')
789
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
790
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 0)
791
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
792
        change2 = EmailChange.objects.get()
793

    
794
        r = self.client.get(change1.get_url())
795
        self.assertEquals(r.status_code, 302)
796
        self.client.logout()
797

    
798
        r = self.client.post('/im/local?next=' + change2.get_url(),
799
                             {'username': 'kpap@synnefo.org',
800
                              'password': 'password',
801
                              'next': change2.get_url()},
802
                             follow=True)
803
        self.assertRedirects(r, '/im/profile')
804
        user = r.context['request'].user
805
        self.assertEquals(user.email, 'kpap@yahoo.com')
806
        self.assertEquals(user.username, 'kpap@yahoo.com')
807

    
808
        self.client.logout()
809
        r = self.client.post('/im/local?next=' + change2.get_url(),
810
                             {'username': 'kpap@synnefo.org',
811
                              'password': 'password',
812
                              'next': change2.get_url()},
813
                             follow=True)
814
        self.assertContains(r, "Please enter a correct username and password")
815
        self.assertEqual(user.emailchanges.count(), 0)
816

    
817

    
818
class TestAuthProviderViews(TestCase):
819

    
820
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
821
                         AUTOMODERATE_POLICY=True)
822
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
823
                 FORCE_PROFILE_UPDATE=False)
824
    def test_user(self):
825
        Profile = AuthProviderPolicyProfile
826
        Pending = PendingThirdPartyUser
827
        User = AstakosUser
828

    
829
        User.objects.create(email="newuser@synnefo.org")
830
        get_local_user("olduser@synnefo.org")
831
        cl_olduser = ShibbolethClient()
832
        get_local_user("olduser2@synnefo.org")
833
        ShibbolethClient()
834
        cl_newuser = ShibbolethClient()
835
        cl_newuser2 = Client()
836

    
837
        academic_group, created = Group.objects.get_or_create(
838
            name='academic-login')
839
        academic_users = academic_group.user_set
840
        assert created
841
        policy_only_academic = Profile.objects.add_policy('academic_strict',
842
                                                          'shibboleth',
843
                                                          academic_group,
844
                                                          exclusive=True,
845
                                                          login=False,
846
                                                          add=False)
847

    
848
        # new academic user
849
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
850
        cl_newuser.set_tokens(eppn="newusereppn")
851
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
852
        pending = Pending.objects.get()
853
        identifier = pending.third_party_identifier
854
        signup_data = {'third_party_identifier': identifier,
855
                       'first_name': 'Academic',
856
                       'third_party_token': pending.token,
857
                       'last_name': 'New User',
858
                       'provider': 'shibboleth'}
859
        r = cl_newuser.post('/im/signup', signup_data)
860
        self.assertContains(r, "This field is required", )
861
        signup_data['email'] = 'olduser@synnefo.org'
862
        r = cl_newuser.post('/im/signup', signup_data)
863
        self.assertContains(r, "already an account with this email", )
864
        signup_data['email'] = 'newuser@synnefo.org'
865
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
866
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
867
        self.assertEqual(r.status_code, 404)
868
        newuser = User.objects.get(email="newuser@synnefo.org")
869
        activation_link = newuser.get_activation_url()
870
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
871

    
872
        # new non-academic user
873
        signup_data = {'first_name': 'Non Academic',
874
                       'last_name': 'New User',
875
                       'provider': 'local',
876
                       'password1': 'password',
877
                       'password2': 'password'}
878
        signup_data['email'] = 'olduser@synnefo.org'
879
        r = cl_newuser2.post('/im/signup', signup_data)
880
        self.assertContains(r, 'There is already an account with this '
881
                               'email address')
882
        signup_data['email'] = 'newuser@synnefo.org'
883
        r = cl_newuser2.post('/im/signup/', signup_data)
884
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
885
        r = self.client.get(activation_link, follow=True)
886
        self.assertEqual(r.status_code, 404)
887
        newuser = User.objects.get(email="newuser@synnefo.org")
888
        self.assertTrue(newuser.activation_sent)
889

    
890
        # activation sent, user didn't open verification url so additional
891
        # registrations invalidate the previous signups.
892
        self.assertFalse(academic_users.filter(email='newuser@synnefo.org'))
893
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
894
        pending = Pending.objects.get()
895
        identifier = pending.third_party_identifier
896
        signup_data = {'third_party_identifier': identifier,
897
                       'first_name': 'Academic',
898
                       'third_party_token': pending.token,
899
                       'last_name': 'New User',
900
                       'provider': 'shibboleth'}
901
        signup_data['email'] = 'newuser@synnefo.org'
902
        r = cl_newuser.post('/im/signup', signup_data)
903
        self.assertEqual(r.status_code, 302)
904
        newuser = User.objects.get(email="newuser@synnefo.org")
905
        self.assertTrue(newuser.activation_sent)
906
        activation_link = newuser.get_activation_url()
907
        self.assertTrue(academic_users.get(email='newuser@synnefo.org'))
908
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
909
        self.assertRedirects(r, '/im/landing')
910
        newuser = User.objects.get(email="newuser@synnefo.org")
911
        self.assertEqual(newuser.is_active, True)
912
        self.assertEqual(newuser.email_verified, True)
913
        cl_newuser.logout()
914

    
915
        # cannot reactivate if suspended
916
        newuser.is_active = False
917
        newuser.save()
918
        r = cl_newuser.get(newuser.get_activation_url())
919
        newuser = User.objects.get(email="newuser@synnefo.org")
920
        self.assertFalse(newuser.is_active)
921

    
922
        # release suspension
923
        newuser.is_active = True
924
        newuser.save()
925

    
926
        cl_newuser.get('/im/login/shibboleth?', follow=True)
927
        local = auth.get_provider('local', newuser)
928
        self.assertEqual(local.get_add_policy, False)
929
        self.assertEqual(local.get_login_policy, False)
930
        r = cl_newuser.get(local.get_add_url, follow=True)
931
        self.assertRedirects(r, '/im/profile')
932
        self.assertContains(r, 'disabled for your')
933

    
934
        cl_olduser.login(username='olduser@synnefo.org', password="password")
935
        r = cl_olduser.get('/im/profile', follow=True)
936
        self.assertEqual(r.status_code, 200)
937
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
938
        self.assertContains(r, 'Your request is missing a unique token')
939
        cl_olduser.set_tokens(eppn="newusereppn")
940
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
941
        self.assertContains(r, 'is already assigned to another user')
942
        cl_olduser.set_tokens(eppn="oldusereppn")
943
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
944
        self.assertContains(r, 'Academic login enabled for this account')
945

    
946
        user = User.objects.get(email="olduser@synnefo.org")
947
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
948
        local_provider = user.get_auth_provider('local')
949
        self.assertEqual(shib_provider.get_remove_policy, True)
950
        self.assertEqual(local_provider.get_remove_policy, True)
951

    
952
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
953
                                                          'shibboleth',
954
                                                          academic_group,
955
                                                          remove=False)
956
        user.groups.add(academic_group)
957
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
958
        local_provider = user.get_auth_provider('local')
959
        self.assertEqual(shib_provider.get_remove_policy, False)
960
        self.assertEqual(local_provider.get_remove_policy, True)
961
        self.assertEqual(local_provider.get_login_policy, False)
962

    
963
        cl_olduser.logout()
964
        login_data = {'username': 'olduser@synnefo.org', 'password': 'password'}
965
        r = cl_olduser.post('/im/local', login_data, follow=True)
966
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
967

    
968

    
969
class TestAuthProvidersAPI(TestCase):
970
    """
971
    Test auth_providers module API
972
    """
973

    
974
    @im_settings(IM_MODULES=['local', 'shibboleth'])
975
    def test_create(self):
976
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
977
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
978

    
979
        module = 'shibboleth'
980
        identifier = 'SHIB_UUID'
981
        provider_params = {
982
            'affiliation': 'UNIVERSITY',
983
            'info': {'age': 27}
984
        }
985
        provider = auth.get_provider(module, user2, identifier,
986
                                     **provider_params)
987
        provider.add_to_user()
988
        provider = auth.get_provider(module, user, identifier,
989
                                     **provider_params)
990
        provider.add_to_user()
991
        user.email_verified = True
992
        user.save()
993
        self.assertRaises(Exception, provider.add_to_user)
994
        provider = user.get_auth_provider(module, identifier)
995
        self.assertEqual(user.get_auth_provider(
996
            module, identifier)._instance.info.get('age'), 27)
997

    
998
        module = 'local'
999
        identifier = None
1000
        provider_params = {'auth_backend': 'ldap', 'info':
1001
                          {'office': 'A1'}}
1002
        provider = auth.get_provider(module, user, identifier,
1003
                                     **provider_params)
1004
        provider.add_to_user()
1005
        self.assertFalse(provider.get_add_policy)
1006
        self.assertRaises(Exception, provider.add_to_user)
1007

    
1008
        shib = user.get_auth_provider('shibboleth',
1009
                                      'SHIB_UUID')
1010
        self.assertTrue(shib.get_remove_policy)
1011

    
1012
        local = user.get_auth_provider('local')
1013
        self.assertTrue(local.get_remove_policy)
1014

    
1015
        local.remove_from_user()
1016
        self.assertFalse(shib.get_remove_policy)
1017
        self.assertRaises(Exception, shib.remove_from_user)
1018

    
1019
        provider = user.get_auth_providers()[0]
1020
        self.assertRaises(Exception, provider.add_to_user)
1021

    
1022
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1023
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
1024
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
1025
                                                 'group2'])
1026
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
1027
                        CREATION_GROUPS_POLICY=['localgroup-create',
1028
                                                'group-create'])
1029
    def test_add_groups(self):
1030
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1031
        provider = auth.get_provider('shibboleth', user, 'test123')
1032
        provider.add_to_user()
1033
        user = AstakosUser.objects.get()
1034
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1035
                              sorted([u'group1', u'group2', u'group-create']))
1036

    
1037
        local = auth.get_provider('local', user)
1038
        local.add_to_user()
1039
        provider = user.get_auth_provider('shibboleth')
1040
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1041
        provider.remove_from_user()
1042
        user = AstakosUser.objects.get()
1043
        self.assertEqual(len(user.get_auth_providers()), 1)
1044
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1045
                              sorted([u'group-create', u'localgroup']))
1046

    
1047
        local = user.get_auth_provider('local')
1048
        self.assertRaises(Exception, local.remove_from_user)
1049
        provider = auth.get_provider('shibboleth', user, 'test123')
1050
        provider.add_to_user()
1051
        user = AstakosUser.objects.get()
1052
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1053
                              sorted([u'group-create', u'group1', u'group2',
1054
                               u'localgroup']))
1055

    
1056

    
1057

    
1058
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1059
    def test_policies(self):
1060
        group_old, created = Group.objects.get_or_create(name='olduser')
1061

    
1062
        astakos_settings.MODERATION_ENABLED = True
1063
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1064
            ['academic-user']
1065
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1066
            ['google-user']
1067

    
1068
        user = AstakosUser.objects.create(email="kpap@synnefo.org")
1069
        user.groups.add(group_old)
1070
        user.add_auth_provider('local')
1071

    
1072
        user2 = AstakosUser.objects.create(email="kpap2@synnefo.org")
1073
        user2.add_auth_provider('shibboleth', identifier='shibid')
1074

    
1075
        user3 = AstakosUser.objects.create(email="kpap3@synnefo.org")
1076
        user3.groups.add(group_old)
1077
        user3.add_auth_provider('local')
1078
        user3.add_auth_provider('shibboleth', identifier='1234')
1079

    
1080
        self.assertTrue(user2.groups.get(name='academic-user'))
1081
        self.assertFalse(user2.groups.filter(name='olduser').count())
1082

    
1083
        local = auth_providers.get_provider('local')
1084
        self.assertTrue(local.get_add_policy)
1085

    
1086
        academic_group = Group.objects.get(name='academic-user')
1087
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1088
                                                     academic_group,
1089
                                                     exclusive=True,
1090
                                                     add=False,
1091
                                                     login=False)
1092
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1093
                                                     academic_group,
1094
                                                     exclusive=True,
1095
                                                     login=False,
1096
                                                     add=False)
1097
        # no duplicate entry gets created
1098
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1099

    
1100
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1101
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1102
                                                     user2,
1103
                                                     remove=False)
1104
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1105

    
1106
        local = auth_providers.get_provider('local', user2)
1107
        google = auth_providers.get_provider('google', user2)
1108
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1109
        self.assertTrue(shibboleth.get_login_policy)
1110
        self.assertFalse(shibboleth.get_remove_policy)
1111
        self.assertFalse(local.get_add_policy)
1112
        self.assertFalse(local.get_add_policy)
1113
        self.assertFalse(google.get_add_policy)
1114

    
1115
        user2.groups.remove(Group.objects.get(name='academic-user'))
1116
        self.assertTrue(local.get_add_policy)
1117
        self.assertTrue(google.get_add_policy)
1118
        user2.groups.add(Group.objects.get(name='academic-user'))
1119

    
1120
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1121
                                                     user2,
1122
                                                     exclusive=True,
1123
                                                     add=True)
1124
        self.assertTrue(local.get_add_policy)
1125
        self.assertTrue(google.get_add_policy)
1126

    
1127
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1128
        self.assertFalse(local.get_automoderate_policy)
1129
        self.assertFalse(google.get_automoderate_policy)
1130
        self.assertTrue(shibboleth.get_automoderate_policy)
1131

    
1132
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1133
                  'GOOGLE_ADD_GROUPS_POLICY']:
1134
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1135

    
1136

    
1137
    @shibboleth_settings(CREATE_POLICY=True)
1138
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1139
    def test_create_http(self):
1140
        # this should be wrapped inside a transaction
1141
        user = AstakosUser(email="test@test.com")
1142
        user.save()
1143
        provider = auth_providers.get_provider('shibboleth', user,
1144
                                               'test@academia.test')
1145
        provider.add_to_user()
1146
        user.get_auth_provider('shibboleth', 'test@academia.test')
1147
        provider = auth_providers.get_provider('local', user)
1148
        provider.add_to_user()
1149
        user.get_auth_provider('local')
1150

    
1151
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1152
        user = AstakosUser(email="test2@test.com")
1153
        user.save()
1154
        provider = auth_providers.get_provider('shibboleth', user,
1155
                                               'test@shibboleth.com',
1156
                                               **{'info': {'name':
1157
                                                                'User Test'}})
1158
        self.assertFalse(provider.get_create_policy)
1159
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1160
        self.assertTrue(provider.get_create_policy)
1161
        academic = provider.add_to_user()
1162

    
1163
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1164
    @shibboleth_settings(LIMIT_POLICY=2)
1165
    def test_policies(self):
1166
        user = get_local_user('kpap@synnefo.org')
1167
        user.add_auth_provider('shibboleth', identifier='1234')
1168
        user.add_auth_provider('shibboleth', identifier='12345')
1169

    
1170
        # default limit is 1
1171
        local = user.get_auth_provider('local')
1172
        self.assertEqual(local.get_add_policy, False)
1173

    
1174
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1175
        academic = user.get_auth_provider('shibboleth',
1176
                                          identifier='1234')
1177
        self.assertEqual(academic.get_add_policy, False)
1178
        newacademic = auth_providers.get_provider('shibboleth', user,
1179
                                                  identifier='123456')
1180
        self.assertEqual(newacademic.get_add_policy, True)
1181
        user.add_auth_provider('shibboleth', identifier='123456')
1182
        self.assertEqual(academic.get_add_policy, False)
1183
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1184

    
1185
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1186
    @shibboleth_settings(LIMIT_POLICY=2)
1187
    def test_messages(self):
1188
        user = get_local_user('kpap@synnefo.org')
1189
        user.add_auth_provider('shibboleth', identifier='1234')
1190
        user.add_auth_provider('shibboleth', identifier='12345')
1191
        provider = auth_providers.get_provider('shibboleth')
1192
        self.assertEqual(provider.get_message('title'), 'Academic')
1193
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1194
        # regenerate messages cache
1195
        provider = auth_providers.get_provider('shibboleth')
1196
        self.assertEqual(provider.get_message('title'), 'New title')
1197
        self.assertEqual(provider.get_message('login_title'),
1198
                         'New title LOGIN')
1199
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1200
        self.assertEqual(provider.get_module_icon,
1201
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1202
        self.assertEqual(provider.get_module_medium_icon,
1203
                         settings.MEDIA_URL +
1204
                         'im/auth/icons-medium/shibboleth.png')
1205

    
1206
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1207
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1208
        self.assertEqual(provider.get_method_details_msg,
1209
                         'Account: 12345')
1210
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1211
        self.assertEqual(provider.get_method_details_msg,
1212
                         'Account: 1234')
1213

    
1214
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1215
        self.assertEqual(provider.get_not_active_msg,
1216
                         "'Academic login' is disabled.")
1217

    
1218
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1219
    @shibboleth_settings(LIMIT_POLICY=2)
1220
    def test_templates(self):
1221
        user = get_local_user('kpap@synnefo.org')
1222
        user.add_auth_provider('shibboleth', identifier='1234')
1223
        user.add_auth_provider('shibboleth', identifier='12345')
1224

    
1225
        provider = auth_providers.get_provider('shibboleth')
1226
        self.assertEqual(provider.get_template('login'),
1227
                         'im/auth/shibboleth_login.html')
1228
        provider = auth_providers.get_provider('google')
1229
        self.assertEqual(provider.get_template('login'),
1230
                         'im/auth/generic_login.html')
1231

    
1232

    
1233
class TestActivationBackend(TestCase):
1234

    
1235
    def setUp(self):
1236
        # dummy call to pass through logging middleware
1237
        self.client.get('/im/')
1238

    
1239
    @im_settings(RE_USER_EMAIL_PATTERNS=['.*@synnefo.org'])
1240
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
1241
    def test_policies(self):
1242
        backend = activation_backends.get_backend()
1243

    
1244
        # email matches RE_USER_EMAIL_PATTERNS
1245
        user1 = get_local_user('kpap@synnefo.org', moderated=False,
1246
                               is_active=False, email_verified=False)
1247
        backend.handle_verification(user1, user1.verification_code)
1248
        self.assertEqual(user1.accepted_policy, 'email')
1249

    
1250
        # manually moderated
1251
        user2 = get_local_user('kpap@synnefo-bad.org', moderated=False,
1252
                               is_active=False, email_verified=False)
1253

    
1254
        backend.handle_verification(user2, user2.verification_code)
1255
        self.assertEqual(user2.moderated, False)
1256
        backend.handle_moderation(user2)
1257
        self.assertEqual(user2.moderated, True)
1258
        self.assertEqual(user2.accepted_policy, 'manual')
1259

    
1260
        # autoaccept due to provider automoderate policy
1261
        user3 = get_local_user('kpap2@synnefo-bad.org', moderated=False,
1262
                               is_active=False, email_verified=False)
1263
        user3.auth_providers.all().delete()
1264
        user3.add_auth_provider('shibboleth', identifier='shib123')
1265
        backend.handle_verification(user3, user3.verification_code)
1266
        self.assertEqual(user3.moderated, True)
1267
        self.assertEqual(user3.accepted_policy, 'auth_provider_shibboleth')
1268

    
1269
    @im_settings(MODERATION_ENABLED=False,
1270
                 MANAGERS=(('Manager',
1271
                            'manager@synnefo.org'),),
1272
                 HELPDESK=(('Helpdesk',
1273
                            'helpdesk@synnefo.org'),),
1274
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1275
    def test_without_moderation(self):
1276
        backend = activation_backends.get_backend()
1277
        form = backend.get_signup_form('local')
1278
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1279

    
1280
        user_data = {
1281
            'email': 'kpap@synnefo.org',
1282
            'first_name': 'Kostas Papas',
1283
            'password1': '123',
1284
            'password2': '123'
1285
        }
1286
        form = backend.get_signup_form('local', user_data)
1287
        user = form.save(commit=False)
1288
        form.store_user(user)
1289
        self.assertEqual(user.is_active, False)
1290
        self.assertEqual(user.email_verified, False)
1291

    
1292
        # step one, registration
1293
        result = backend.handle_registration(user)
1294
        user = AstakosUser.objects.get()
1295
        self.assertEqual(user.is_active, False)
1296
        self.assertEqual(user.email_verified, False)
1297
        self.assertTrue(user.verification_code)
1298
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1299
        backend.send_result_notifications(result, user)
1300
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1301
        self.assertEqual(len(mail.outbox), 1)
1302

    
1303
        # step two, verify email (automatically
1304
        # moderates/accepts user, since moderation is disabled)
1305
        user = AstakosUser.objects.get()
1306
        valid_code = user.verification_code
1307

    
1308
        # test invalid code
1309
        result = backend.handle_verification(user, valid_code)
1310
        backend.send_result_notifications(result, user)
1311
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1312
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1313
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1314
        # verification + activated + greeting = 3
1315
        self.assertEqual(len(mail.outbox), 3)
1316
        user = AstakosUser.objects.get()
1317
        self.assertEqual(user.is_active, True)
1318
        self.assertEqual(user.moderated, True)
1319
        self.assertTrue(user.moderated_at)
1320
        self.assertEqual(user.email_verified, True)
1321
        self.assertTrue(user.activation_sent)
1322

    
1323
    @im_settings(MODERATION_ENABLED=True,
1324
                 MANAGERS=(('Manager',
1325
                            'manager@synnefo.org'),),
1326
                 HELPDESK=(('Helpdesk',
1327
                            'helpdesk@synnefo.org'),),
1328
                 ADMINS=(('Admin', 'admin@synnefo.org'), ))
1329
    def test_with_moderation(self):
1330

    
1331
        backend = activation_backends.get_backend()
1332
        form = backend.get_signup_form('local')
1333
        self.assertTrue(isinstance(form, forms.LocalUserCreationForm))
1334

    
1335
        user_data = {
1336
            'email': 'kpap@synnefo.org',
1337
            'first_name': 'Kostas Papas',
1338
            'password1': '123',
1339
            'password2': '123'
1340
        }
1341
        form = backend.get_signup_form(provider='local',
1342
                                       initial_data=user_data)
1343
        user = form.save(commit=False)
1344
        form.store_user(user)
1345
        self.assertEqual(user.is_active, False)
1346
        self.assertEqual(user.email_verified, False)
1347

    
1348
        # step one, registration
1349
        result = backend.handle_registration(user)
1350
        user = AstakosUser.objects.get()
1351
        self.assertEqual(user.is_active, False)
1352
        self.assertEqual(user.email_verified, False)
1353
        self.assertTrue(user.verification_code)
1354
        self.assertEqual(result.status, backend.Result.PENDING_VERIFICATION)
1355
        backend.send_result_notifications(result, user)
1356
        self.assertEqual(len(get_mailbox('kpap@synnefo.org')), 1)
1357
        self.assertEqual(len(mail.outbox), 1)
1358

    
1359
        # step two, verifying email
1360
        user = AstakosUser.objects.get()
1361
        valid_code = user.verification_code
1362
        invalid_code = user.verification_code + 'invalid'
1363

    
1364
        # test invalid code
1365
        result = backend.handle_verification(user, invalid_code)
1366
        self.assertEqual(result.status, backend.Result.ERROR)
1367
        backend.send_result_notifications(result, user)
1368
        user = AstakosUser.objects.get()
1369
        self.assertEqual(user.is_active, False)
1370
        self.assertEqual(user.moderated, False)
1371
        self.assertEqual(user.moderated_at, None)
1372
        self.assertEqual(user.email_verified, False)
1373
        self.assertTrue(user.activation_sent)
1374

    
1375
        # test valid code
1376
        user = AstakosUser.objects.get()
1377
        result = backend.handle_verification(user, valid_code)
1378
        backend.send_result_notifications(result, user)
1379
        self.assertEqual(len(get_mailbox('manager@synnefo.org')), 1)
1380
        self.assertEqual(len(get_mailbox('helpdesk@synnefo.org')), 1)
1381
        self.assertEqual(len(get_mailbox('admin@synnefo.org')), 1)
1382
        self.assertEqual(len(mail.outbox), 2)
1383
        user = AstakosUser.objects.get()
1384
        self.assertEqual(user.moderated, False)
1385
        self.assertEqual(user.moderated_at, None)
1386
        self.assertEqual(user.email_verified, True)
1387
        self.assertTrue(user.activation_sent)
1388

    
1389
        # test code reuse
1390
        result = backend.handle_verification(user, valid_code)
1391
        self.assertEqual(result.status, backend.Result.ERROR)
1392
        user = AstakosUser.objects.get()
1393
        self.assertEqual(user.is_active, False)
1394
        self.assertEqual(user.moderated, False)
1395
        self.assertEqual(user.moderated_at, None)
1396
        self.assertEqual(user.email_verified, True)
1397
        self.assertTrue(user.activation_sent)
1398

    
1399
        # valid code on verified user
1400
        user = AstakosUser.objects.get()
1401
        valid_code = user.verification_code
1402
        result = backend.handle_verification(user, valid_code)
1403
        self.assertEqual(result.status, backend.Result.ERROR)
1404

    
1405
        # step three, moderation user
1406
        user = AstakosUser.objects.get()
1407
        result = backend.handle_moderation(user)
1408
        backend.send_result_notifications(result, user)
1409

    
1410
        user = AstakosUser.objects.get()
1411
        self.assertEqual(user.is_active, True)
1412
        self.assertEqual(user.moderated, True)
1413
        self.assertTrue(user.moderated_at)
1414
        self.assertEqual(user.email_verified, True)
1415
        self.assertTrue(user.activation_sent)
1416

    
1417

    
1418
class TestProjects(TestCase):
1419
    """
1420
    Test projects.
1421
    """
1422
    def setUp(self):
1423
        # astakos resources
1424
        self.astakos_service = Service.objects.create(name="astakos",
1425
                                                      api_url="/astakos/api/")
1426
        self.resource = Resource.objects.create(name="astakos.pending_app",
1427
                                                uplimit=0,
1428
                                                allow_in_projects=False,
1429
                                                service=self.astakos_service)
1430

    
1431
        # custom service resources
1432
        self.service = Service.objects.create(name="service1",
1433
                                              api_url="http://service.api")
1434
        self.resource = Resource.objects.create(name="service1.resource",
1435
                                                uplimit=100,
1436
                                                service=self.service)
1437
        self.admin = get_local_user("projects-admin@synnefo.org")
1438
        self.admin.uuid = 'uuid1'
1439
        self.admin.save()
1440

    
1441
        self.user = get_local_user("user@synnefo.org")
1442
        self.member = get_local_user("member@synnefo.org")
1443
        self.member2 = get_local_user("member2@synnefo.org")
1444

    
1445
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1446
        self.user_client = get_user_client("user@synnefo.org")
1447
        self.member_client = get_user_client("member@synnefo.org")
1448
        self.member2_client = get_user_client("member2@synnefo.org")
1449

    
1450
        quotas.qh_sync_users(AstakosUser.objects.all())
1451

    
1452
    @im_settings(PROJECT_ADMINS=['uuid1'])
1453
    def test_application_limit(self):
1454
        # user cannot create a project
1455
        r = self.user_client.get(reverse('project_add'), follow=True)
1456
        self.assertRedirects(r, reverse('project_list'))
1457
        self.assertContains(r, "You are not allowed to create a new project")
1458

    
1459
        # but admin can
1460
        r = self.admin_client.get(reverse('project_add'), follow=True)
1461
        self.assertRedirects(r, reverse('project_add'))
1462

    
1463
    @im_settings(PROJECT_ADMINS=['uuid1'])
1464
    def test_allow_in_project(self):
1465
        dfrom = datetime.now()
1466
        dto = datetime.now() + timedelta(days=30)
1467

    
1468
        # astakos.pending_uplimit allow_in_project flag is False
1469
        # we shouldn't be able to create a project application using this
1470
        # resource.
1471
        application_data = {
1472
            'name': 'project.synnefo.org',
1473
            'homepage': 'https://www.synnefo.org',
1474
            'start_date': dfrom.strftime("%Y-%m-%d"),
1475
            'end_date': dto.strftime("%Y-%m-%d"),
1476
            'member_join_policy': 2,
1477
            'member_leave_policy': 1,
1478
            'service1.resource_uplimit': 100,
1479
            'is_selected_service1.resource': "1",
1480
            'astakos.pending_app_uplimit': 100,
1481
            'is_selected_accounts': "1",
1482
            'user': self.user.pk
1483
        }
1484
        form = forms.ProjectApplicationForm(data=application_data)
1485
        # form is invalid
1486
        self.assertEqual(form.is_valid(), False)
1487

    
1488
        del application_data['astakos.pending_app_uplimit']
1489
        del application_data['is_selected_accounts']
1490
        form = forms.ProjectApplicationForm(data=application_data)
1491
        self.assertEqual(form.is_valid(), True)
1492

    
1493
    @im_settings(PROJECT_ADMINS=['uuid1'])
1494
    def test_applications(self):
1495
        # let user have 2 pending applications
1496
        quotas.add_base_quota(self.user, 'astakos.pending_app', 2)
1497

    
1498
        r = self.user_client.get(reverse('project_add'), follow=True)
1499
        self.assertRedirects(r, reverse('project_add'))
1500

    
1501
        # user fills the project application form
1502
        post_url = reverse('project_add') + '?verify=1'
1503
        dfrom = datetime.now()
1504
        dto = datetime.now() + timedelta(days=30)
1505
        application_data = {
1506
            'name': 'project.synnefo.org',
1507
            'homepage': 'https://www.synnefo.org',
1508
            'start_date': dfrom.strftime("%Y-%m-%d"),
1509
            'end_date': dto.strftime("%Y-%m-%d"),
1510
            'member_join_policy': 2,
1511
            'member_leave_policy': 1,
1512
            'service1.resource_uplimit': 100,
1513
            'is_selected_service1.resource': "1",
1514
            'user': self.user.pk
1515
        }
1516
        r = self.user_client.post(post_url, data=application_data, follow=True)
1517
        self.assertEqual(r.status_code, 200)
1518
        self.assertEqual(r.context['form'].is_valid(), True)
1519

    
1520
        # confirm request
1521
        post_url = reverse('project_add') + '?verify=0&edit=0'
1522
        r = self.user_client.post(post_url, data=application_data, follow=True)
1523
        self.assertContains(r, "The project application has been received")
1524
        self.assertRedirects(r, reverse('project_list'))
1525
        self.assertEqual(ProjectApplication.objects.count(), 1)
1526
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1527

    
1528
        # create another one
1529
        application_data['name'] = 'project2.synnefo.org'
1530
        r = self.user_client.post(post_url, data=application_data, follow=True)
1531
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1532

    
1533
        # no more applications (LIMIT is 2)
1534
        r = self.user_client.get(reverse('project_add'), follow=True)
1535
        self.assertRedirects(r, reverse('project_list'))
1536
        self.assertContains(r, "You are not allowed to create a new project")
1537

    
1538
        # login
1539
        self.admin_client.get(reverse("edit_profile"))
1540
        # admin approves
1541
        r = self.admin_client.post(reverse('project_app_approve',
1542
                                           kwargs={'application_id': app1_id}),
1543
                                   follow=True)
1544
        self.assertEqual(r.status_code, 200)
1545

    
1546
        # project created
1547
        self.assertEqual(Project.objects.count(), 1)
1548

    
1549
        # login
1550
        self.member_client.get(reverse("edit_profile"))
1551
        # cannot join app2 (not approved yet)
1552
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1553
        r = self.member_client.post(join_url, follow=True)
1554
        self.assertEqual(r.status_code, 403)
1555

    
1556
        # can join app1
1557
        self.member_client.get(reverse("edit_profile"))
1558
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1559
        r = self.member_client.post(join_url, follow=True)
1560
        self.assertEqual(r.status_code, 200)
1561

    
1562
        memberships = ProjectMembership.objects.all()
1563
        self.assertEqual(len(memberships), 1)
1564
        memb_id = memberships[0].id
1565

    
1566
        reject_member_url = reverse('project_reject_member',
1567
                                    kwargs={'chain_id': app1_id, 'memb_id':
1568
                                            memb_id})
1569
        accept_member_url = reverse('project_accept_member',
1570
                                    kwargs={'chain_id': app1_id, 'memb_id':
1571
                                            memb_id})
1572

    
1573
        # only project owner is allowed to reject
1574
        r = self.member_client.post(reject_member_url, follow=True)
1575
        self.assertContains(r, "You do not have the permissions")
1576
        self.assertEqual(r.status_code, 200)
1577

    
1578
        # user (owns project) rejects membership
1579
        r = self.user_client.post(reject_member_url, follow=True)
1580
        self.assertEqual(ProjectMembership.objects.count(), 0)
1581

    
1582
        # user rejoins
1583
        self.member_client.get(reverse("edit_profile"))
1584
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1585
        r = self.member_client.post(join_url, follow=True)
1586
        self.assertEqual(r.status_code, 200)
1587
        self.assertEqual(ProjectMembership.objects.count(), 1)
1588

    
1589
        # user (owns project) accepts membership
1590
        r = self.user_client.post(accept_member_url, follow=True)
1591
        self.assertEqual(ProjectMembership.objects.count(), 1)
1592
        membership = ProjectMembership.objects.get()
1593
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1594

    
1595
        user_quotas = quotas.get_users_quotas([self.member])
1596
        resource = 'service1.resource'
1597
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1598
        # 100 from initial uplimit + 100 from project
1599
        self.assertEqual(newlimit, 200)
1600

    
1601
        remove_member_url = reverse('project_remove_member',
1602
                                    kwargs={'chain_id': app1_id, 'memb_id':
1603
                                            membership.id})
1604
        r = self.user_client.post(remove_member_url, follow=True)
1605
        self.assertEqual(r.status_code, 200)
1606

    
1607
        user_quotas = quotas.get_users_quotas([self.member])
1608
        resource = 'service1.resource'
1609
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1610
        # 200 - 100 from project
1611
        self.assertEqual(newlimit, 100)
1612

    
1613

    
1614
ROOT = '/astakos/api/'
1615
u = lambda url: ROOT + url
1616

    
1617

    
1618
class QuotaAPITest(TestCase):
1619
    def test_0(self):
1620
        client = Client()
1621
        # custom service resources
1622
        service1 = Service.objects.create(
1623
            name="service1", api_url="http://service1.api")
1624
        resource11 = {"name": "service1.resource11",
1625
                      "desc": "resource11 desc",
1626
                      "allow_in_projects": True}
1627
        r, _ = resources.add_resource(service1, resource11)
1628
        resources.update_resource(r, 100)
1629
        resource12 = {"name": "service1.resource12",
1630
                      "desc": "resource11 desc",
1631
                      "unit": "bytes"}
1632
        r, _ = resources.add_resource(service1, resource12)
1633
        resources.update_resource(r, 1024)
1634

    
1635
        # create user
1636
        user = get_local_user('test@grnet.gr')
1637
        quotas.qh_sync_user(user)
1638

    
1639
        # create another service
1640
        service2 = Service.objects.create(
1641
            name="service2", api_url="http://service2.api")
1642
        resource21 = {"name": "service2.resource21",
1643
                      "desc": "resource11 desc",
1644
                      "allow_in_projects": False}
1645
        r, _ = resources.add_resource(service2, resource21)
1646
        resources.update_resource(r, 3)
1647

    
1648
        resource_names = [r['name'] for r in
1649
                          [resource11, resource12, resource21]]
1650

    
1651
        # get resources
1652
        r = client.get(u('resources'), follow=True)
1653
        self.assertEqual(r.status_code, 200)
1654
        body = json.loads(r.content)
1655
        for name in resource_names:
1656
            self.assertIn(name, body)
1657

    
1658
        # get quota
1659
        r = client.get(u('quotas'), follow=True)
1660
        self.assertEqual(r.status_code, 401)
1661

    
1662
        headers = {'HTTP_X_AUTH_TOKEN': user.auth_token}
1663
        r = client.get(u('quotas'), follow=True, **headers)
1664
        self.assertEqual(r.status_code, 200)
1665
        body = json.loads(r.content)
1666
        system_quota = body['system']
1667
        self.assertIn('system', body)
1668
        for name in resource_names:
1669
            self.assertIn(name, system_quota)
1670

    
1671
        r = client.get(u('service_quotas'), follow=True)
1672
        self.assertEqual(r.status_code, 401)
1673

    
1674
        s1_headers = {'HTTP_X_AUTH_TOKEN': service1.auth_token}
1675
        r = client.get(u('service_quotas'), follow=True, **s1_headers)
1676
        self.assertEqual(r.status_code, 200)
1677
        body = json.loads(r.content)
1678
        self.assertIn(user.uuid, body)
1679

    
1680
        r = client.get(u('commissions'), follow=True, **s1_headers)
1681
        self.assertEqual(r.status_code, 200)
1682
        body = json.loads(r.content)
1683
        self.assertEqual(body, [])
1684

    
1685
        # issue some commissions
1686
        commission_request = {
1687
            "force": False,
1688
            "auto_accept": False,
1689
            "name": "my commission",
1690
            "provisions": [
1691
                {
1692
                    "holder": user.uuid,
1693
                    "source": "system",
1694
                    "resource": resource11['name'],
1695
                    "quantity": 1
1696
                },
1697
                {
1698
                    "holder": user.uuid,
1699
                    "source": "system",
1700
                    "resource": resource12['name'],
1701
                    "quantity": 30000
1702
                }]}
1703

    
1704
        post_data = json.dumps(commission_request)
1705
        r = client.post(u('commissions'), post_data,
1706
                        content_type='application/json', **s1_headers)
1707
        self.assertEqual(r.status_code, 413)
1708

    
1709
        commission_request = {
1710
            "force": False,
1711
            "auto_accept": False,
1712
            "name": "my commission",
1713
            "provisions": [
1714
                {
1715
                    "holder": user.uuid,
1716
                    "source": "system",
1717
                    "resource": resource11['name'],
1718
                    "quantity": 1
1719
                },
1720
                {
1721
                    "holder": user.uuid,
1722
                    "source": "system",
1723
                    "resource": resource12['name'],
1724
                    "quantity": 100
1725
                }]}
1726

    
1727
        post_data = json.dumps(commission_request)
1728
        r = client.post(u('commissions'), post_data,
1729
                        content_type='application/json', **s1_headers)
1730
        self.assertEqual(r.status_code, 201)
1731
        body = json.loads(r.content)
1732
        serial = body['serial']
1733
        self.assertEqual(serial, 1)
1734

    
1735
        post_data = json.dumps(commission_request)
1736
        r = client.post(u('commissions'), post_data,
1737
                        content_type='application/json', **s1_headers)
1738
        self.assertEqual(r.status_code, 201)
1739
        body = json.loads(r.content)
1740
        self.assertEqual(body['serial'], 2)
1741

    
1742
        post_data = json.dumps(commission_request)
1743
        r = client.post(u('commissions'), post_data,
1744
                        content_type='application/json', **s1_headers)
1745
        self.assertEqual(r.status_code, 201)
1746
        body = json.loads(r.content)
1747
        self.assertEqual(body['serial'], 3)
1748

    
1749
        r = client.get(u('commissions'), follow=True, **s1_headers)
1750
        self.assertEqual(r.status_code, 200)
1751
        body = json.loads(r.content)
1752
        self.assertEqual(body, [1, 2, 3])
1753

    
1754
        r = client.get(u('commissions/' + str(serial)), follow=True,
1755
                       **s1_headers)
1756
        self.assertEqual(r.status_code, 200)
1757
        body = json.loads(r.content)
1758
        self.assertEqual(body['serial'], serial)
1759
        self.assertIn('issue_time', body)
1760
        self.assertEqual(body['provisions'], commission_request['provisions'])
1761
        self.assertEqual(body['name'], commission_request['name'])
1762

    
1763
        r = client.get(u('service_quotas?user=' + user.uuid),
1764
                       follow=True, **s1_headers)
1765
        self.assertEqual(r.status_code, 200)
1766
        body = json.loads(r.content)
1767
        user_quota = body[user.uuid]
1768
        system_quota = user_quota['system']
1769
        r11 = system_quota[resource11['name']]
1770
        self.assertEqual(r11['usage'], 3)
1771
        self.assertEqual(r11['pending'], 3)
1772

    
1773
        # resolve pending commissions
1774
        resolve_data = {
1775
            "accept": [1, 3],
1776
            "reject": [2, 3, 4],
1777
        }
1778
        post_data = json.dumps(resolve_data)
1779

    
1780
        r = client.post(u('commissions/action'), post_data,
1781
                        content_type='application/json', **s1_headers)
1782
        self.assertEqual(r.status_code, 200)
1783
        body = json.loads(r.content)
1784
        self.assertEqual(body['accepted'], [1])
1785
        self.assertEqual(body['rejected'], [2])
1786
        failed = body['failed']
1787
        self.assertEqual(len(failed), 2)
1788

    
1789
        r = client.get(u('commissions/' + str(serial)), follow=True,
1790
                       **s1_headers)
1791
        self.assertEqual(r.status_code, 404)
1792

    
1793
        # auto accept
1794
        commission_request = {
1795
            "auto_accept": True,
1796
            "name": "my commission",
1797
            "provisions": [
1798
                {
1799
                    "holder": user.uuid,
1800
                    "source": "system",
1801
                    "resource": resource11['name'],
1802
                    "quantity": 1
1803
                },
1804
                {
1805
                    "holder": user.uuid,
1806
                    "source": "system",
1807
                    "resource": resource12['name'],
1808
                    "quantity": 100
1809
                }]}
1810

    
1811
        post_data = json.dumps(commission_request)
1812
        r = client.post(u('commissions'), post_data,
1813
                        content_type='application/json', **s1_headers)
1814
        self.assertEqual(r.status_code, 201)
1815
        body = json.loads(r.content)
1816
        serial = body['serial']
1817
        self.assertEqual(serial, 4)
1818

    
1819
        r = client.get(u('commissions/' + str(serial)), follow=True,
1820
                       **s1_headers)
1821
        self.assertEqual(r.status_code, 404)
1822

    
1823
        # malformed
1824
        commission_request = {
1825
            "auto_accept": True,
1826
            "name": "my commission",
1827
            "provisions": [
1828
                {
1829
                    "holder": user.uuid,
1830
                    "source": "system",
1831
                    "resource": resource11['name'],
1832
                }
1833
            ]}
1834

    
1835
        post_data = json.dumps(commission_request)
1836
        r = client.post(u('commissions'), post_data,
1837
                        content_type='application/json', **s1_headers)
1838
        self.assertEqual(r.status_code, 400)
1839

    
1840
        commission_request = {
1841
            "auto_accept": True,
1842
            "name": "my commission",
1843
            "provisions": "dummy"}
1844

    
1845
        post_data = json.dumps(commission_request)
1846
        r = client.post(u('commissions'), post_data,
1847
                        content_type='application/json', **s1_headers)
1848
        self.assertEqual(r.status_code, 400)
1849

    
1850
        r = client.post(u('commissions'), commission_request,
1851
                        content_type='application/json', **s1_headers)
1852
        self.assertEqual(r.status_code, 400)
1853

    
1854
        # no holding
1855
        commission_request = {
1856
            "auto_accept": True,
1857
            "name": "my commission",
1858
            "provisions": [
1859
                {
1860
                    "holder": user.uuid,
1861
                    "source": "system",
1862
                    "resource": "non existent",
1863
                    "quantity": 1
1864
                },
1865
                {
1866
                    "holder": user.uuid,
1867
                    "source": "system",
1868
                    "resource": resource12['name'],
1869
                    "quantity": 100
1870
                }]}
1871

    
1872
        post_data = json.dumps(commission_request)
1873
        r = client.post(u('commissions'), post_data,
1874
                        content_type='application/json', **s1_headers)
1875
        self.assertEqual(r.status_code, 404)
1876

    
1877
        # release
1878
        commission_request = {
1879
            "provisions": [
1880
                {
1881
                    "holder": user.uuid,
1882
                    "source": "system",
1883
                    "resource": resource11['name'],
1884
                    "quantity": -1
1885
                }
1886
            ]}
1887

    
1888
        post_data = json.dumps(commission_request)
1889
        r = client.post(u('commissions'), post_data,
1890
                        content_type='application/json', **s1_headers)
1891
        self.assertEqual(r.status_code, 201)
1892
        body = json.loads(r.content)
1893
        serial = body['serial']
1894

    
1895
        accept_data = {'accept': ""}
1896
        post_data = json.dumps(accept_data)
1897
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1898
                        content_type='application/json', **s1_headers)
1899
        self.assertEqual(r.status_code, 200)
1900

    
1901
        reject_data = {'reject': ""}
1902
        post_data = json.dumps(accept_data)
1903
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1904
                        content_type='application/json', **s1_headers)
1905
        self.assertEqual(r.status_code, 404)
1906

    
1907
        # force
1908
        commission_request = {
1909
            "force": True,
1910
            "provisions": [
1911
                {
1912
                    "holder": user.uuid,
1913
                    "source": "system",
1914
                    "resource": resource11['name'],
1915
                    "quantity": 100
1916
                }]}
1917

    
1918
        post_data = json.dumps(commission_request)
1919
        r = client.post(u('commissions'), post_data,
1920
                        content_type='application/json', **s1_headers)
1921
        self.assertEqual(r.status_code, 201)
1922
        body = json.loads(r.content)
1923

    
1924
        r = client.get(u('quotas'), **headers)
1925
        self.assertEqual(r.status_code, 200)
1926
        body = json.loads(r.content)
1927
        system_quota = body['system']
1928
        r11 = system_quota[resource11['name']]
1929
        self.assertEqual(r11['usage'], 102)
1930
        self.assertEqual(r11['pending'], 101)