Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ ba27316a

History | View | Annotate | Download (70.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings
40

    
41
from django.test import TestCase, Client
42
from django.core import mail
43
from django.http import SimpleCookie, HttpRequest, QueryDict
44
from django.utils.importlib import import_module
45
from django.utils import simplejson as json
46

    
47
from astakos.im.activation_backends import *
48
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
49
from astakos.im.models import *
50
from astakos.im import functions
51
from astakos.im import settings as astakos_settings
52
from astakos.im import forms
53

    
54
from urllib import quote
55
from datetime import timedelta
56

    
57
from astakos.im import messages
58
from astakos.im import auth_providers
59
from astakos.im import quotas
60
from astakos.im import resources
61

    
62
from django.conf import settings
63

    
64

    
65
# set some common settings
66
astakos_settings.EMAILCHANGE_ENABLED = True
67
astakos_settings.RECAPTCHA_ENABLED = False
68

    
69
settings.LOGGING_SETUP['disable_existing_loggers'] = False
70

    
71
# shortcut decorators to override provider settings
72
# e.g. shibboleth_settings(ENABLED=True) will set
73
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
74
prefixes = {'providers': 'AUTH_PROVIDER_',
75
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
76
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
77
im_settings = functools.partial(with_settings, astakos_settings)
78
shibboleth_settings = functools.partial(with_settings,
79
                                        settings,
80
                                        prefix=prefixes['shibboleth'])
81
localauth_settings = functools.partial(with_settings, settings,
82
                                       prefix=prefixes['local'])
83

    
84

    
85
class AstakosTestClient(Client):
86
    pass
87

    
88

    
89
class ShibbolethClient(AstakosTestClient):
90
    """
91
    A shibboleth agnostic client.
92
    """
93
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
94
                          dir(ShibbolethTokens))
95

    
96
    def __init__(self, *args, **kwargs):
97
        self.tokens = kwargs.pop('tokens', {})
98
        super(ShibbolethClient, self).__init__(*args, **kwargs)
99

    
100
    def set_tokens(self, **kwargs):
101
        for key, value in kwargs.iteritems():
102
            key = 'SHIB_%s' % key.upper()
103
            if not key in self.VALID_TOKENS:
104
                raise Exception('Invalid shibboleth token')
105

    
106
            self.tokens[key] = value
107

    
108
    def unset_tokens(self, *keys):
109
        for key in keys:
110
            key = 'SHIB_%s' % param.upper()
111
            if key in self.tokens:
112
                del self.tokens[key]
113

    
114
    def reset_tokens(self):
115
        self.tokens = {}
116

    
117
    def get_http_token(self, key):
118
        http_header = getattr(ShibbolethTokens, key)
119
        return http_header
120

    
121
    def request(self, **request):
122
        """
123
        Transform valid shibboleth tokens to http headers
124
        """
125
        for token, value in self.tokens.iteritems():
126
            request[self.get_http_token(token)] = value
127

    
128
        for param in request.keys():
129
            key = 'SHIB_%s' % param.upper()
130
            if key in self.VALID_TOKENS:
131
                request[self.get_http_token(key)] = request[param]
132
                del request[param]
133

    
134
        return super(ShibbolethClient, self).request(**request)
135

    
136

    
137
def get_user_client(username, password="password"):
138
    client = Client()
139
    client.login(username=username, password=password)
140
    return client
141

    
142

    
143
def get_local_user(username, **kwargs):
144
        try:
145
            return AstakosUser.objects.get(email=username)
146
        except:
147
            user_params = {
148
                'username': username,
149
                'email': username,
150
                'is_active': True,
151
                'activation_sent': datetime.now(),
152
                'email_verified': True
153
            }
154
            user_params.update(kwargs)
155
            user = AstakosUser(**user_params)
156
            user.set_password(kwargs.get('password', 'password'))
157
            user.save()
158
            user.add_auth_provider('local', auth_backend='astakos')
159
            if kwargs.get('is_active', True):
160
                user.is_active = True
161
            else:
162
                user.is_active = False
163
            user.save()
164
            return user
165

    
166

    
167
def get_mailbox(email):
168
    mails = []
169
    for sent_email in mail.outbox:
170
        for recipient in sent_email.recipients():
171
            if email in recipient:
172
                mails.append(sent_email)
173
    return mails
174

    
175

    
176
class ShibbolethTests(TestCase):
177
    """
178
    Testing shibboleth authentication.
179
    """
180

    
181
    fixtures = ['groups']
182

    
183
    def setUp(self):
184
        self.client = ShibbolethClient()
185
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
186
        astakos_settings.MODERATION_ENABLED = True
187

    
188
    @im_settings(FORCE_PROFILE_UPDATE=False)
189
    def test_create_account(self):
190

    
191
        client = ShibbolethClient()
192

    
193
        # shibboleth views validation
194
        # eepn required
195
        r = client.get('/im/login/shibboleth?', follow=True)
196
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
197
            'domain': astakos_settings.BASEURL,
198
            'contact_email': settings.CONTACT_EMAIL
199
        })
200
        client.set_tokens(eppn="kpapeppn")
201

    
202
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
203
        # shibboleth user info required
204
        r = client.get('/im/login/shibboleth?', follow=True)
205
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
206
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
207

    
208
        # shibboleth logged us in
209
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
210
                          cn="Kostas Papadimitriou",
211
                          ep_affiliation="Test Affiliation")
212
        r = client.get('/im/login/shibboleth?', follow=True)
213
        token = PendingThirdPartyUser.objects.get().token
214
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
215
        self.assertEqual(r.status_code, 200)
216

    
217
        # a new pending user created
218
        pending_user = PendingThirdPartyUser.objects.get(
219
            third_party_identifier="kpapeppn")
220
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
221
        # keep the token for future use
222
        token = pending_user.token
223
        # from now on no shibboleth headers are sent to the server
224
        client.reset_tokens()
225

    
226
        # this is the old way, it should fail, to avoid pending user take over
227
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
228
        self.assertEqual(r.status_code, 404)
229

    
230
        # this is the signup unique url associated with the pending user
231
        # created
232
        r = client.get('/im/signup/?third_party_token=%s' % token)
233
        identifier = pending_user.third_party_identifier
234
        post_data = {'third_party_identifier': identifier,
235
                     'first_name': 'Kostas',
236
                     'third_party_token': token,
237
                     'last_name': 'Mitroglou',
238
                     'provider': 'shibboleth'}
239

    
240
        signup_url = reverse('signup')
241

    
242
        # invlid email
243
        post_data['email'] = 'kpap'
244
        r = client.post(signup_url, post_data)
245
        self.assertContains(r, token)
246

    
247
        # existing email
248
        existing_user = get_local_user('test@test.com')
249
        post_data['email'] = 'test@test.com'
250
        r = client.post(signup_url, post_data)
251
        self.assertContains(r, messages.EMAIL_USED)
252
        existing_user.delete()
253

    
254
        # and finally a valid signup
255
        post_data['email'] = 'kpap@grnet.gr'
256
        r = client.post(signup_url, post_data, follow=True)
257
        self.assertContains(r, messages.NOTIFICATION_SENT)
258

    
259
        # everything is ok in our db
260
        self.assertEqual(AstakosUser.objects.count(), 1)
261
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
262
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
263

    
264
        # provider info stored
265
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
266
        self.assertEqual(provider.affiliation, 'Test Affiliation')
267
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
268
                                         u'eppn': u'kpapeppn',
269
                                         u'name': u'Kostas Papadimitriou'})
270

    
271
        # lets login (not activated yet)
272
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
273
                          cn="Kostas Papadimitriou", )
274
        r = client.get("/im/login/shibboleth?", follow=True)
275
        self.assertContains(r, 'is pending moderation')
276

    
277
        # admin activates our user
278
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
279
        functions.activate(u)
280
        self.assertEqual(u.is_active, True)
281

    
282
        # we see our profile
283
        r = client.get("/im/login/shibboleth?", follow=True)
284
        self.assertRedirects(r, '/im/landing')
285
        self.assertEqual(r.status_code, 200)
286

    
287
    def test_existing(self):
288
        """
289
        Test adding of third party login to an existing account
290
        """
291

    
292
        # this is our existing user
293
        existing_user = get_local_user('kpap@grnet.gr')
294
        existing_inactive = get_local_user('kpap-inactive@grnet.gr')
295
        existing_inactive.is_active = False
296
        existing_inactive.save()
297

    
298
        existing_unverified = get_local_user('kpap-unverified@grnet.gr')
299
        existing_unverified.is_active = False
300
        existing_unverified.activation_sent = None
301
        existing_unverified.email_verified = False
302
        existing_unverified.is_verified = False
303
        existing_unverified.save()
304

    
305
        client = ShibbolethClient()
306
        # shibboleth logged us in, notice that we use different email
307
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
308
                          cn="Kostas Papadimitriou", )
309
        r = client.get("/im/login/shibboleth?", follow=True)
310

    
311
        # a new pending user created
312
        pending_user = PendingThirdPartyUser.objects.get()
313
        token = pending_user.token
314
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
315
        pending_key = pending_user.token
316
        client.reset_tokens()
317
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
318

    
319
        form = r.context['login_form']
320
        signupdata = copy.copy(form.initial)
321
        signupdata['email'] = 'kpap@grnet.gr'
322
        signupdata['third_party_token'] = token
323
        signupdata['provider'] = 'shibboleth'
324
        signupdata.pop('id', None)
325

    
326
        # the email exists to another user
327
        r = client.post("/im/signup", signupdata)
328
        self.assertContains(r, "There is already an account with this email "
329
                               "address")
330
        # change the case, still cannot create
331
        signupdata['email'] = 'KPAP@grnet.GR'
332
        r = client.post("/im/signup", signupdata)
333
        self.assertContains(r, "There is already an account with this email "
334
                               "address")
335
        # inactive user
336
        signupdata['email'] = 'KPAP-inactive@grnet.GR'
337
        r = client.post("/im/signup", signupdata)
338
        self.assertContains(r, "There is already an account with this email "
339
                               "address")
340

    
341
        # unverified user, this should pass, old entry will be deleted
342
        signupdata['email'] = 'KAPAP-unverified@grnet.GR'
343
        r = client.post("/im/signup", signupdata)
344

    
345
        post_data = {'password': 'password',
346
                     'username': 'kpap@grnet.gr'}
347
        r = client.post('/im/local', post_data, follow=True)
348
        self.assertTrue(r.context['request'].user.is_authenticated())
349
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
350
                          cn="Kostas Papadimitriou", )
351
        r = client.get("/im/login/shibboleth?", follow=True)
352
        self.assertContains(r, "enabled for this account")
353
        client.reset_tokens()
354

    
355
        user = existing_user
356
        self.assertTrue(user.has_auth_provider('shibboleth'))
357
        self.assertTrue(user.has_auth_provider('local',
358
                                               auth_backend='astakos'))
359
        client.logout()
360

    
361
        # look Ma, i can login with both my shibboleth and local account
362
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
363
                          cn="Kostas Papadimitriou")
364
        r = client.get("/im/login/shibboleth?", follow=True)
365
        self.assertTrue(r.context['request'].user.is_authenticated())
366
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
367
        self.assertRedirects(r, '/im/landing')
368
        self.assertEqual(r.status_code, 200)
369
        client.logout()
370
        client.reset_tokens()
371

    
372
        # logged out
373
        r = client.get("/im/profile", follow=True)
374
        self.assertFalse(r.context['request'].user.is_authenticated())
375

    
376
        # login with local account also works
377
        post_data = {'password': 'password',
378
                     'username': 'kpap@grnet.gr'}
379
        r = self.client.post('/im/local', post_data, follow=True)
380
        self.assertTrue(r.context['request'].user.is_authenticated())
381
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
382
        self.assertRedirects(r, '/im/landing')
383
        self.assertEqual(r.status_code, 200)
384

    
385
        # cannot add the same eppn
386
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
387
                          cn="Kostas Papadimitriou", )
388
        r = client.get("/im/login/shibboleth?", follow=True)
389
        self.assertRedirects(r, '/im/landing')
390
        self.assertTrue(r.status_code, 200)
391
        self.assertEquals(existing_user.auth_providers.count(), 2)
392

    
393
        # only one allowed by default
394
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
395
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
396
        prov = auth_providers.get_provider('shibboleth')
397
        r = client.get("/im/login/shibboleth?", follow=True)
398
        self.assertContains(r, "Failed to add")
399
        self.assertRedirects(r, '/im/profile')
400
        self.assertTrue(r.status_code, 200)
401
        self.assertEquals(existing_user.auth_providers.count(), 2)
402
        client.logout()
403
        client.reset_tokens()
404

    
405
        # cannot login with another eppn
406
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
407
                          cn="Kostas Papadimitriou")
408
        r = client.get("/im/login/shibboleth?", follow=True)
409
        self.assertFalse(r.context['request'].user.is_authenticated())
410

    
411
        # cannot
412

    
413
        # lets remove local password
414
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
415
                                       email="kpap@grnet.gr")
416
        remove_local_url = user.get_auth_provider('local').get_remove_url
417
        remove_shibbo_url = user.get_auth_provider('shibboleth',
418
                                                   'kpapeppn').get_remove_url
419
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
420
                          cn="Kostas Papadimtriou")
421
        r = client.get("/im/login/shibboleth?", follow=True)
422
        client.reset_tokens()
423

    
424
        # TODO: this view should use POST
425
        r = client.get(remove_local_url)
426
        # 2 providers left
427
        self.assertEqual(user.auth_providers.count(), 1)
428
        # cannot remove last provider
429
        r = client.get(remove_shibbo_url)
430
        self.assertEqual(r.status_code, 403)
431
        self.client.logout()
432

    
433
        # cannot login using local credentials (notice we use another client)
434
        post_data = {'password': 'password',
435
                     'username': 'kpap@grnet.gr'}
436
        r = self.client.post('/im/local', post_data, follow=True)
437
        self.assertFalse(r.context['request'].user.is_authenticated())
438

    
439
        # we can reenable the local provider by setting a password
440
        r = client.get("/im/password_change", follow=True)
441
        r = client.post("/im/password_change", {'new_password1': '111',
442
                                                'new_password2': '111'},
443
                        follow=True)
444
        user = r.context['request'].user
445
        self.assertTrue(user.has_auth_provider('local'))
446
        self.assertTrue(user.has_auth_provider('shibboleth'))
447
        self.assertTrue(user.check_password('111'))
448
        self.assertTrue(user.has_usable_password())
449
        self.client.logout()
450

    
451
        # now we can login
452
        post_data = {'password': '111',
453
                     'username': 'kpap@grnet.gr'}
454
        r = self.client.post('/im/local', post_data, follow=True)
455
        self.assertTrue(r.context['request'].user.is_authenticated())
456

    
457
        client.reset_tokens()
458

    
459
        # we cannot take over another shibboleth identifier
460
        user2 = get_local_user('another@grnet.gr')
461
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
462
        # login
463
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
464
                          cn="Kostas Papadimitriou")
465
        r = client.get("/im/login/shibboleth?", follow=True)
466
        # try to assign existing shibboleth identifier of another user
467
        client.set_tokens(mail="kpap_second@shibboleth.gr",
468
                          eppn="existingeppn", cn="Kostas Papadimitriou")
469
        r = client.get("/im/login/shibboleth?", follow=True)
470
        self.assertContains(r, "this account is already assigned")
471

    
472

    
473
class TestLocal(TestCase):
474

    
475
    fixtures = ['groups']
476

    
477
    def setUp(self):
478
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
479
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
480
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
481
        settings.ASTAKOS_MODERATION_ENABLED = True
482

    
483
    def tearDown(self):
484
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
485

    
486
    def test_no_moderation(self):
487
        # disable moderation
488
        astakos_settings.MODERATION_ENABLED = False
489

    
490
        # create a new user
491
        r = self.client.get("/im/signup")
492
        self.assertEqual(r.status_code, 200)
493
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
494
                'password2': 'password', 'first_name': 'Kostas',
495
                'last_name': 'Mitroglou', 'provider': 'local'}
496
        r = self.client.post("/im/signup", data)
497

    
498
        # user created
499
        self.assertEqual(AstakosUser.objects.count(), 1)
500
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
501
                                       email="kpap@grnet.gr")
502
        self.assertEqual(user.username, 'kpap@grnet.gr')
503
        self.assertEqual(user.has_auth_provider('local'), True)
504
        self.assertFalse(user.is_active)
505

    
506
        # user (but not admin) gets notified
507
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
508
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
509
        astakos_settings.MODERATION_ENABLED = True
510

    
511
    def test_email_case(self):
512
        data = {
513
            'email': 'kPap@grnet.gr',
514
            'password1': '1234',
515
            'password2': '1234'
516
        }
517

    
518
        form = forms.LocalUserCreationForm(data)
519
        self.assertTrue(form.is_valid())
520
        user = form.save()
521
        form.store_user(user, {})
522

    
523
        u = AstakosUser.objects.get()
524
        self.assertEqual(u.email, 'kPap@grnet.gr')
525
        self.assertEqual(u.username, 'kpap@grnet.gr')
526
        u.is_active = True
527
        u.email_verified = True
528
        u.save()
529

    
530
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
531
        login = forms.LoginForm(data=data)
532
        self.assertTrue(login.is_valid())
533

    
534
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
535
        login = forms.LoginForm(data=data)
536
        self.assertTrue(login.is_valid())
537

    
538
        data = {
539
            'email': 'kpap@grnet.gr',
540
            'password1': '1234',
541
            'password2': '1234'
542
        }
543
        form = forms.LocalUserCreationForm(data)
544
        self.assertFalse(form.is_valid())
545

    
546
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),),
547
                 FORCE_PROFILE_UPDATE=False)
548
    def test_local_provider(self):
549
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
550
        # enable moderation
551
        astakos_settings.MODERATION_ENABLED = True
552

    
553
        # create a user
554
        r = self.client.get("/im/signup")
555
        self.assertEqual(r.status_code, 200)
556
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
557
                'password2': 'password', 'first_name': 'Kostas',
558
                'last_name': 'Mitroglou', 'provider': 'local'}
559
        r = self.client.post("/im/signup", data)
560

    
561
        # user created
562
        self.assertEqual(AstakosUser.objects.count(), 1)
563
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
564
                                       email="kpap@grnet.gr")
565
        self.assertEqual(user.username, 'kpap@grnet.gr')
566
        self.assertEqual(user.has_auth_provider('local'), True)
567
        self.assertFalse(user.is_active)  # not activated
568
        self.assertFalse(user.email_verified)  # not verified
569
        self.assertFalse(user.activation_sent)  # activation automatically sent
570

    
571
        # admin gets notified and activates the user from the command line
572
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
573
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
574
                                           'password': 'password'})
575
        self.assertContains(r, messages.NOTIFICATION_SENT)
576
        functions.send_activation(user)
577

    
578
        # user activation fields updated and user gets notified via email
579
        user = AstakosUser.objects.get(pk=user.pk)
580
        self.assertTrue(user.activation_sent)
581
        self.assertFalse(user.email_verified)
582
        self.assertFalse(user.is_active)
583
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
584

    
585
        # user forgot she got registered and tries to submit registration
586
        # form. Notice the upper case in email
587
        data = {'email': 'KPAP@grnet.gr', 'password1': 'password',
588
                'password2': 'password', 'first_name': 'Kostas',
589
                'last_name': 'Mitroglou', 'provider': 'local'}
590
        r = self.client.post("/im/signup", data, follow=True)
591
        self.assertRedirects(r, reverse('index'))
592
        self.assertContains(r, messages.NOTIFICATION_SENT)
593

    
594
        user = AstakosUser.objects.get()
595
        functions.send_activation(user)
596

    
597
        # previous user replaced
598
        self.assertTrue(user.activation_sent)
599
        self.assertFalse(user.email_verified)
600
        self.assertFalse(user.is_active)
601
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 1)
602

    
603
        # hmmm, email exists; lets request a password change
604
        r = self.client.get('/im/local/password_reset')
605
        self.assertEqual(r.status_code, 200)
606
        data = {'email': 'kpap@grnet.gr'}
607
        r = self.client.post('/im/local/password_reset', data, follow=True)
608
        # she can't because account is not active yet
609
        self.assertContains(r, 'pending activation')
610

    
611
        # moderation is enabled and an activation email has already been sent
612
        # so user can trigger resend of the activation email
613
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
614
        self.assertContains(r, 'has been sent to your email address.')
615
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 2)
616

    
617
        # also she cannot login
618
        data = {'username': 'kpap@grnet.gr', 'password': 'password'}
619
        r = self.client.post('/im/local', data, follow=True)
620
        self.assertContains(r, 'Resend activation')
621
        self.assertFalse(r.context['request'].user.is_authenticated())
622
        self.assertFalse('_pithos2_a' in self.client.cookies)
623

    
624
        # user sees the message and resends activation
625
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
626
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 3)
627

    
628
        # switch back moderation setting
629
        astakos_settings.MODERATION_ENABLED = True
630
        r = self.client.get(user.get_activation_url(), follow=True)
631
        self.assertRedirects(r, "/im/landing")
632
        r = self.client.get('/im/profile', follow=True)
633
        self.assertTrue(r.context['request'].user.is_authenticated())
634
        self.assertTrue('_pithos2_a' in self.client.cookies)
635
        self.assertContains(r, "KPAP@grnet.gr")
636
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 4)
637

    
638
        user = AstakosUser.objects.get(pk=user.pk)
639
        # user activated and logged in, token cookie set
640
        self.assertTrue(r.context['request'].user.is_authenticated())
641
        self.assertTrue('_pithos2_a' in self.client.cookies)
642
        cookies = self.client.cookies
643
        self.assertTrue(quote(user.auth_token) in
644
                        cookies.get('_pithos2_a').value)
645
        r = self.client.get('/im/logout', follow=True)
646
        r = self.client.get('/im/')
647
        # user logged out, token cookie removed
648
        self.assertFalse(r.context['request'].user.is_authenticated())
649
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
650

    
651
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
652
        del self.client.cookies['_pithos2_a']
653

    
654
        # user can login
655
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
656
                                           'password': 'password'},
657
                             follow=True)
658
        self.assertTrue(r.context['request'].user.is_authenticated())
659
        self.assertTrue('_pithos2_a' in self.client.cookies)
660
        cookies = self.client.cookies
661
        self.assertTrue(quote(user.auth_token) in
662
                        cookies.get('_pithos2_a').value)
663
        self.client.get('/im/logout', follow=True)
664

    
665
        # user forgot password
666
        old_pass = user.password
667
        r = self.client.get('/im/local/password_reset')
668
        self.assertEqual(r.status_code, 200)
669
        r = self.client.post('/im/local/password_reset', {'email':
670
                                                          'kpap@grnet.gr'})
671
        self.assertEqual(r.status_code, 302)
672
        # email sent
673
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 5)
674

    
675
        # user visits change password link
676
        # "Refresh" user because created url is based on last_login timestamp
677
        user = AstakosUser.objects.get(pk=user.pk)
678
        r = self.client.get(user.get_password_reset_url())
679
        r = self.client.post(user.get_password_reset_url(),
680
                             {'new_password1': 'newpass',
681
                              'new_password2': 'newpass'})
682

    
683
        user = AstakosUser.objects.get(pk=user.pk)
684
        self.assertNotEqual(old_pass, user.password)
685

    
686
        # old pass is not usable
687
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
688
                                           'password': 'password'})
689
        self.assertContains(r, 'Please enter a correct username and password')
690
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
691
                                           'password': 'newpass'},
692
                             follow=True)
693
        self.assertTrue(r.context['request'].user.is_authenticated())
694
        self.client.logout()
695

    
696
        # tests of special local backends
697
        user = AstakosUser.objects.get(pk=user.pk)
698
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
699
        user.save()
700

    
701
        # non astakos local backends do not support password reset
702
        r = self.client.get('/im/local/password_reset')
703
        self.assertEqual(r.status_code, 200)
704
        r = self.client.post('/im/local/password_reset', {'email':
705
                                                          'kpap@grnet.gr'})
706
        # she can't because account is not active yet
707
        self.assertContains(r, "Changing password is not")
708

    
709

    
710
class UserActionsTests(TestCase):
711

    
712
    def test_email_change(self):
713
        # to test existing email validation
714
        get_local_user('existing@grnet.gr')
715

    
716
        # local user
717
        user = get_local_user('kpap@grnet.gr')
718

    
719
        # login as kpap
720
        self.client.login(username='kpap@grnet.gr', password='password')
721
        r = self.client.get('/im/profile', follow=True)
722
        user = r.context['request'].user
723
        self.assertTrue(user.is_authenticated())
724

    
725
        # change email is enabled
726
        r = self.client.get('/im/email_change')
727
        self.assertEqual(r.status_code, 200)
728
        self.assertFalse(user.email_change_is_pending())
729

    
730
        # request email change to an existing email fails
731
        data = {'new_email_address': 'existing@grnet.gr'}
732
        r = self.client.post('/im/email_change', data)
733
        self.assertContains(r, messages.EMAIL_USED)
734

    
735
        # proper email change
736
        data = {'new_email_address': 'kpap@gmail.com'}
737
        r = self.client.post('/im/email_change', data, follow=True)
738
        self.assertRedirects(r, '/im/profile')
739
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
740
        change1 = EmailChange.objects.get()
741

    
742
        # user sees a warning
743
        r = self.client.get('/im/email_change')
744
        self.assertEqual(r.status_code, 200)
745
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
746
        self.assertTrue(user.email_change_is_pending())
747

    
748
        # link was sent
749
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
750
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
751

    
752
        # proper email change
753
        data = {'new_email_address': 'kpap@yahoo.com'}
754
        r = self.client.post('/im/email_change', data, follow=True)
755
        self.assertRedirects(r, '/im/profile')
756
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
757
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
758
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
759
        change2 = EmailChange.objects.get()
760

    
761
        r = self.client.get(change1.get_url())
762
        self.assertEquals(r.status_code, 302)
763
        self.client.logout()
764

    
765
        r = self.client.post('/im/local?next=' + change2.get_url(),
766
                             {'username': 'kpap@grnet.gr',
767
                              'password': 'password',
768
                              'next': change2.get_url()},
769
                             follow=True)
770
        self.assertRedirects(r, '/im/profile')
771
        user = r.context['request'].user
772
        self.assertEquals(user.email, 'kpap@yahoo.com')
773
        self.assertEquals(user.username, 'kpap@yahoo.com')
774

    
775
        self.client.logout()
776
        r = self.client.post('/im/local?next=' + change2.get_url(),
777
                             {'username': 'kpap@grnet.gr',
778
                              'password': 'password',
779
                              'next': change2.get_url()},
780
                             follow=True)
781
        self.assertContains(r, "Please enter a correct username and password")
782
        self.assertEqual(user.emailchanges.count(), 0)
783

    
784

    
785
class TestAuthProviderViews(TestCase):
786

    
787
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'],
788
                         AUTOMODERATE_POLICY=True)
789
    @im_settings(IM_MODULES=['shibboleth', 'local'], MODERATION_ENABLED=True,
790
                 FORCE_PROFILE_UPDATE=False)
791
    def test_user(self):
792
        Profile = AuthProviderPolicyProfile
793
        Pending = PendingThirdPartyUser
794
        User = AstakosUser
795

    
796
        User.objects.create(email="newuser@grnet.gr")
797
        get_local_user("olduser@grnet.gr")
798
        cl_olduser = ShibbolethClient()
799
        get_local_user("olduser2@grnet.gr")
800
        ShibbolethClient()
801
        cl_newuser = ShibbolethClient()
802
        cl_newuser2 = Client()
803

    
804
        academic_group, created = Group.objects.get_or_create(
805
            name='academic-login')
806
        academic_users = academic_group.user_set
807
        assert created
808
        policy_only_academic = Profile.objects.add_policy('academic_strict',
809
                                                          'shibboleth',
810
                                                          academic_group,
811
                                                          exclusive=True,
812
                                                          login=False,
813
                                                          add=False)
814

    
815

    
816
        # new academic user
817
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
818
        cl_newuser.set_tokens(eppn="newusereppn")
819
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
820
        pending = Pending.objects.get()
821
        identifier = pending.third_party_identifier
822
        signup_data = {'third_party_identifier': identifier,
823
                       'first_name': 'Academic',
824
                       'third_party_token': pending.token,
825
                       'last_name': 'New User',
826
                       'provider': 'shibboleth'}
827
        r = cl_newuser.post('/im/signup', signup_data)
828
        self.assertContains(r, "This field is required", )
829
        signup_data['email'] = 'olduser@grnet.gr'
830
        r = cl_newuser.post('/im/signup', signup_data)
831
        self.assertContains(r, "already an account with this email", )
832
        signup_data['email'] = 'newuser@grnet.gr'
833
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
834
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
835
        self.assertEqual(r.status_code, 404)
836
        newuser = User.objects.get(email="newuser@grnet.gr")
837
        activation_link = newuser.get_activation_url()
838
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
839

    
840
        # new non-academic user
841
        signup_data = {'first_name': 'Non Academic',
842
                       'last_name': 'New User',
843
                       'provider': 'local',
844
                       'password1': 'password',
845
                       'password2': 'password'}
846
        signup_data['email'] = 'olduser@grnet.gr'
847
        r = cl_newuser2.post('/im/signup', signup_data)
848
        self.assertContains(r, 'There is already an account with this '
849
                               'email address')
850
        signup_data['email'] = 'newuser@grnet.gr'
851
        r = cl_newuser2.post('/im/signup/', signup_data)
852
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
853
        r = self.client.get(activation_link, follow=True)
854
        self.assertEqual(r.status_code, 400)
855
        newuser = User.objects.get(email="newuser@grnet.gr")
856
        self.assertFalse(newuser.activation_sent)
857
        r = self.client.get(newuser.get_activation_url(), follow=True)
858
        self.assertContains(r, "pending moderation")
859

    
860
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
861
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
862
        pending = Pending.objects.get()
863
        identifier = pending.third_party_identifier
864
        signup_data = {'third_party_identifier': identifier,
865
                       'first_name': 'Academic',
866
                       'third_party_token': pending.token,
867
                       'last_name': 'New User',
868
                       'provider': 'shibboleth'}
869
        signup_data['email'] = 'newuser@grnet.gr'
870
        r = cl_newuser.post('/im/signup', signup_data)
871
        newuser = User.objects.get(email="newuser@grnet.gr")
872
        self.assertTrue(newuser.activation_sent)
873
        activation_link = newuser.get_activation_url()
874
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
875
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
876
        self.assertRedirects(r, '/im/landing')
877
        newuser = User.objects.get(email="newuser@grnet.gr")
878
        self.assertEqual(newuser.is_active, True)
879
        self.assertEqual(newuser.email_verified, True)
880
        cl_newuser.logout()
881

    
882
        # cannot reactivate if suspended
883
        newuser.is_active = False
884
        newuser.save()
885
        r = cl_newuser.get(newuser.get_activation_url())
886
        newuser = User.objects.get(email="newuser@grnet.gr")
887
        self.assertFalse(newuser.is_active)
888

    
889
        # release suspension
890
        newuser.is_active = True
891
        newuser.save()
892

    
893
        cl_newuser.get('/im/login/shibboleth?', follow=True)
894
        local = auth.get_provider('local', newuser)
895
        self.assertEqual(local.get_add_policy, False)
896
        self.assertEqual(local.get_login_policy, False)
897
        r = cl_newuser.get(local.get_add_url, follow=True)
898
        self.assertRedirects(r, '/im/profile')
899
        self.assertContains(r, 'disabled for your')
900

    
901
        cl_olduser.login(username='olduser@grnet.gr', password="password")
902
        r = cl_olduser.get('/im/profile', follow=True)
903
        self.assertEqual(r.status_code, 200)
904
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
905
        self.assertContains(r, 'Your request is missing a unique token')
906
        cl_olduser.set_tokens(eppn="newusereppn")
907
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
908
        self.assertContains(r, 'is already assigned to another user')
909
        cl_olduser.set_tokens(eppn="oldusereppn")
910
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
911
        self.assertContains(r, 'Academic login enabled for this account')
912

    
913
        user = User.objects.get(email="olduser@grnet.gr")
914
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
915
        local_provider = user.get_auth_provider('local')
916
        self.assertEqual(shib_provider.get_remove_policy, True)
917
        self.assertEqual(local_provider.get_remove_policy, True)
918

    
919

    
920
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
921
                                                          'shibboleth',
922
                                                          academic_group,
923
                                                          remove=False)
924
        user.groups.add(academic_group)
925
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
926
        local_provider = user.get_auth_provider('local')
927
        self.assertEqual(shib_provider.get_remove_policy, False)
928
        self.assertEqual(local_provider.get_remove_policy, True)
929
        self.assertEqual(local_provider.get_login_policy, False)
930

    
931
        cl_olduser.logout()
932
        login_data = {'username': 'olduser@grnet.gr', 'password': 'password'}
933
        r = cl_olduser.post('/im/local', login_data, follow=True)
934
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
935

    
936

    
937
class TestAuthProvidersAPI(TestCase):
938
    """
939
    Test auth_providers module API
940
    """
941

    
942
    @im_settings(IM_MODULES=['local', 'shibboleth'])
943
    def test_create(self):
944
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
945
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
946

    
947
        module = 'shibboleth'
948
        identifier = 'SHIB_UUID'
949
        provider_params = {
950
            'affiliation': 'UNIVERSITY',
951
            'info': {'age': 27}
952
        }
953
        provider = auth.get_provider(module, user2, identifier,
954
                                     **provider_params)
955
        provider.add_to_user()
956
        provider = auth.get_provider(module, user, identifier,
957
                                     **provider_params)
958
        provider.add_to_user()
959
        user.email_verified = True
960
        user.save()
961
        self.assertRaises(Exception, provider.add_to_user)
962
        provider = user.get_auth_provider(module, identifier)
963
        self.assertEqual(user.get_auth_provider(
964
            module, identifier)._instance.info.get('age'), 27)
965

    
966
        module = 'local'
967
        identifier = None
968
        provider_params = {'auth_backend': 'ldap', 'info':
969
                          {'office': 'A1'}}
970
        provider = auth.get_provider(module, user, identifier,
971
                                     **provider_params)
972
        provider.add_to_user()
973
        self.assertFalse(provider.get_add_policy)
974
        self.assertRaises(Exception, provider.add_to_user)
975

    
976
        shib = user.get_auth_provider('shibboleth',
977
                                      'SHIB_UUID')
978
        self.assertTrue(shib.get_remove_policy)
979

    
980
        local = user.get_auth_provider('local')
981
        self.assertTrue(local.get_remove_policy)
982

    
983
        local.remove_from_user()
984
        self.assertFalse(shib.get_remove_policy)
985
        self.assertRaises(Exception, shib.remove_from_user)
986

    
987
        provider = user.get_auth_providers()[0]
988
        self.assertRaises(Exception, provider.add_to_user)
989

    
990
    @im_settings(IM_MODULES=['local', 'shibboleth'])
991
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'],
992
                         CREATION_GROUPS_POLICY=['group-create', 'group1',
993
                                                 'group2'])
994
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'],
995
                        CREATION_GROUPS_POLICY=['localgroup-create',
996
                                                'group-create'])
997
    def test_add_groups(self):
998
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
999
        provider = auth.get_provider('shibboleth', user, 'test123')
1000
        provider.add_to_user()
1001
        user = AstakosUser.objects.get()
1002
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1003
                         sorted([u'group1', u'group2', u'group-create']))
1004

    
1005
        local = auth.get_provider('local', user)
1006
        local.add_to_user()
1007
        provider = user.get_auth_provider('shibboleth')
1008
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1009
        provider.remove_from_user()
1010
        user = AstakosUser.objects.get()
1011
        self.assertEqual(len(user.get_auth_providers()), 1)
1012
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1013
                         sorted([u'group-create', u'localgroup']))
1014

    
1015
        local = user.get_auth_provider('local')
1016
        self.assertRaises(Exception, local.remove_from_user)
1017
        provider = auth.get_provider('shibboleth', user, 'test123')
1018
        provider.add_to_user()
1019
        user = AstakosUser.objects.get()
1020
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1021
                         sorted([u'group-create', u'group1', u'group2',
1022
                                 u'localgroup']))
1023

    
1024
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1025
    def test_policies(self):
1026
        group_old, created = Group.objects.get_or_create(name='olduser')
1027

    
1028
        astakos_settings.MODERATION_ENABLED = True
1029
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1030
            ['academic-user']
1031
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1032
            ['google-user']
1033

    
1034
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
1035
        user.groups.add(group_old)
1036
        user.add_auth_provider('local')
1037

    
1038
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
1039
        user2.add_auth_provider('shibboleth', identifier='shibid')
1040

    
1041
        user3 = AstakosUser.objects.create(email="kpap3@grnet.gr")
1042
        user3.groups.add(group_old)
1043
        user3.add_auth_provider('local')
1044
        user3.add_auth_provider('shibboleth', identifier='1234')
1045

    
1046
        self.assertTrue(user2.groups.get(name='academic-user'))
1047
        self.assertFalse(user2.groups.filter(name='olduser').count())
1048

    
1049
        local = auth_providers.get_provider('local')
1050
        self.assertTrue(local.get_add_policy)
1051

    
1052
        academic_group = Group.objects.get(name='academic-user')
1053
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1054
                                                     academic_group,
1055
                                                     exclusive=True,
1056
                                                     add=False,
1057
                                                     login=False)
1058
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1059
                                                     academic_group,
1060
                                                     exclusive=True,
1061
                                                     login=False,
1062
                                                     add=False)
1063
        # no duplicate entry gets created
1064
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1065

    
1066
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1067
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1068
                                                     user2,
1069
                                                     remove=False)
1070
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1071

    
1072
        local = auth_providers.get_provider('local', user2)
1073
        google = auth_providers.get_provider('google', user2)
1074
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1075
        self.assertTrue(shibboleth.get_login_policy)
1076
        self.assertFalse(shibboleth.get_remove_policy)
1077
        self.assertFalse(local.get_add_policy)
1078
        self.assertFalse(local.get_add_policy)
1079
        self.assertFalse(google.get_add_policy)
1080

    
1081
        user2.groups.remove(Group.objects.get(name='academic-user'))
1082
        self.assertTrue(local.get_add_policy)
1083
        self.assertTrue(google.get_add_policy)
1084
        user2.groups.add(Group.objects.get(name='academic-user'))
1085

    
1086
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1087
                                                     user2,
1088
                                                     exclusive=True,
1089
                                                     add=True)
1090
        self.assertTrue(local.get_add_policy)
1091
        self.assertTrue(google.get_add_policy)
1092

    
1093
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1094
        self.assertFalse(local.get_automoderate_policy)
1095
        self.assertFalse(google.get_automoderate_policy)
1096
        self.assertTrue(shibboleth.get_automoderate_policy)
1097

    
1098
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1099
                  'GOOGLE_ADD_GROUPS_POLICY']:
1100
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1101

    
1102

    
1103
    @shibboleth_settings(CREATE_POLICY=True)
1104
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1105
    def test_create_http(self):
1106
        # this should be wrapped inside a transaction
1107
        user = AstakosUser(email="test@test.com")
1108
        user.save()
1109
        provider = auth_providers.get_provider('shibboleth', user,
1110
                                               'test@academia.test')
1111
        provider.add_to_user()
1112
        user.get_auth_provider('shibboleth', 'test@academia.test')
1113
        provider = auth_providers.get_provider('local', user)
1114
        provider.add_to_user()
1115
        user.get_auth_provider('local')
1116

    
1117
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1118
        user = AstakosUser(email="test2@test.com")
1119
        user.save()
1120
        provider = auth_providers.get_provider('shibboleth', user,
1121
                                               'test@shibboleth.com',
1122
                                               **{'info': {'name':
1123
                                                                'User Test'}})
1124
        self.assertFalse(provider.get_create_policy)
1125
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1126
        self.assertTrue(provider.get_create_policy)
1127
        academic = provider.add_to_user()
1128

    
1129
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1130
    @shibboleth_settings(LIMIT_POLICY=2)
1131
    def test_policies(self):
1132
        user = get_local_user('kpap@grnet.gr')
1133
        user.add_auth_provider('shibboleth', identifier='1234')
1134
        user.add_auth_provider('shibboleth', identifier='12345')
1135

    
1136
        # default limit is 1
1137
        local = user.get_auth_provider('local')
1138
        self.assertEqual(local.get_add_policy, False)
1139

    
1140
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1141
        academic = user.get_auth_provider('shibboleth',
1142
                                          identifier='1234')
1143
        self.assertEqual(academic.get_add_policy, False)
1144
        newacademic = auth_providers.get_provider('shibboleth', user,
1145
                                                  identifier='123456')
1146
        self.assertEqual(newacademic.get_add_policy, True)
1147
        user.add_auth_provider('shibboleth', identifier='123456')
1148
        self.assertEqual(academic.get_add_policy, False)
1149
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1150

    
1151
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1152
    @shibboleth_settings(LIMIT_POLICY=2)
1153
    def test_messages(self):
1154
        user = get_local_user('kpap@grnet.gr')
1155
        user.add_auth_provider('shibboleth', identifier='1234')
1156
        user.add_auth_provider('shibboleth', identifier='12345')
1157
        provider = auth_providers.get_provider('shibboleth')
1158
        self.assertEqual(provider.get_message('title'), 'Academic')
1159
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1160
        # regenerate messages cache
1161
        provider = auth_providers.get_provider('shibboleth')
1162
        self.assertEqual(provider.get_message('title'), 'New title')
1163
        self.assertEqual(provider.get_message('login_title'),
1164
                         'New title LOGIN')
1165
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1166
        self.assertEqual(provider.get_module_icon,
1167
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1168
        self.assertEqual(provider.get_module_medium_icon,
1169
                         settings.MEDIA_URL +
1170
                         'im/auth/icons-medium/shibboleth.png')
1171

    
1172
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1173
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1174
        self.assertEqual(provider.get_method_details_msg,
1175
                         'Account: 12345')
1176
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1177
        self.assertEqual(provider.get_method_details_msg,
1178
                         'Account: 1234')
1179

    
1180
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1181
        self.assertEqual(provider.get_not_active_msg,
1182
                         "'Academic login' is disabled.")
1183

    
1184
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1185
    @shibboleth_settings(LIMIT_POLICY=2)
1186
    def test_templates(self):
1187
        user = get_local_user('kpap@grnet.gr')
1188
        user.add_auth_provider('shibboleth', identifier='1234')
1189
        user.add_auth_provider('shibboleth', identifier='12345')
1190

    
1191
        provider = auth_providers.get_provider('shibboleth')
1192
        self.assertEqual(provider.get_template('login'),
1193
                         'im/auth/shibboleth_login.html')
1194
        provider = auth_providers.get_provider('google')
1195
        self.assertEqual(provider.get_template('login'),
1196
                         'im/auth/generic_login.html')
1197

    
1198

    
1199
class TestProjects(TestCase):
1200
    """
1201
    Test projects.
1202
    """
1203
    def setUp(self):
1204
        # astakos resources
1205
        self.astakos_service = Service.objects.create(name="astakos",
1206
                                                      api_url="/astakos/api/")
1207
        self.resource = Resource.objects.create(name="astakos.pending_app",
1208
                                                uplimit=0,
1209
                                                allow_in_projects=False,
1210
                                                service=self.astakos_service)
1211

    
1212
        # custom service resources
1213
        self.service = Service.objects.create(name="service1",
1214
                                              api_url="http://service.api")
1215
        self.resource = Resource.objects.create(name="service1.resource",
1216
                                                uplimit=100,
1217
                                                service=self.service)
1218
        self.admin = get_local_user("projects-admin@synnefo.org")
1219
        self.admin.uuid = 'uuid1'
1220
        self.admin.save()
1221

    
1222
        self.user = get_local_user("user@synnefo.org")
1223
        self.member = get_local_user("member@synnefo.org")
1224
        self.member2 = get_local_user("member2@synnefo.org")
1225

    
1226
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1227
        self.user_client = get_user_client("user@synnefo.org")
1228
        self.member_client = get_user_client("member@synnefo.org")
1229
        self.member2_client = get_user_client("member2@synnefo.org")
1230

    
1231
        quotas.qh_sync_users(AstakosUser.objects.all())
1232

    
1233
    @im_settings(PROJECT_ADMINS=['uuid1'])
1234
    def test_application_limit(self):
1235
        # user cannot create a project
1236
        r = self.user_client.get(reverse('project_add'), follow=True)
1237
        self.assertRedirects(r, reverse('project_list'))
1238
        self.assertContains(r, "You are not allowed to create a new project")
1239

    
1240
        # but admin can
1241
        r = self.admin_client.get(reverse('project_add'), follow=True)
1242
        self.assertRedirects(r, reverse('project_add'))
1243

    
1244
    @im_settings(PROJECT_ADMINS=['uuid1'])
1245
    def test_allow_in_project(self):
1246
        dfrom = datetime.now()
1247
        dto = datetime.now() + timedelta(days=30)
1248

    
1249
        # astakos.pending_uplimit allow_in_project flag is False
1250
        # we shouldn't be able to create a project application using this
1251
        # resource.
1252
        application_data = {
1253
            'name': 'project.synnefo.org',
1254
            'homepage': 'https://www.synnefo.org',
1255
            'start_date': dfrom.strftime("%Y-%m-%d"),
1256
            'end_date': dto.strftime("%Y-%m-%d"),
1257
            'member_join_policy': 2,
1258
            'member_leave_policy': 1,
1259
            'service1.resource_uplimit': 100,
1260
            'is_selected_service1.resource': "1",
1261
            'astakos.pending_app_uplimit': 100,
1262
            'is_selected_accounts': "1",
1263
            'user': self.user.pk
1264
        }
1265
        form = forms.ProjectApplicationForm(data=application_data)
1266
        # form is invalid
1267
        self.assertEqual(form.is_valid(), False)
1268

    
1269
        del application_data['astakos.pending_app_uplimit']
1270
        del application_data['is_selected_accounts']
1271
        form = forms.ProjectApplicationForm(data=application_data)
1272
        self.assertEqual(form.is_valid(), True)
1273

    
1274
    @im_settings(PROJECT_ADMINS=['uuid1'])
1275
    def test_applications(self):
1276
        # let user have 2 pending applications
1277
        quotas.add_base_quota(self.user, 'astakos.pending_app', 2)
1278

    
1279
        r = self.user_client.get(reverse('project_add'), follow=True)
1280
        self.assertRedirects(r, reverse('project_add'))
1281

    
1282
        # user fills the project application form
1283
        post_url = reverse('project_add') + '?verify=1'
1284
        dfrom = datetime.now()
1285
        dto = datetime.now() + timedelta(days=30)
1286
        application_data = {
1287
            'name': 'project.synnefo.org',
1288
            'homepage': 'https://www.synnefo.org',
1289
            'start_date': dfrom.strftime("%Y-%m-%d"),
1290
            'end_date': dto.strftime("%Y-%m-%d"),
1291
            'member_join_policy': 2,
1292
            'member_leave_policy': 1,
1293
            'service1.resource_uplimit': 100,
1294
            'is_selected_service1.resource': "1",
1295
            'user': self.user.pk
1296
        }
1297
        r = self.user_client.post(post_url, data=application_data, follow=True)
1298
        self.assertEqual(r.status_code, 200)
1299
        self.assertEqual(r.context['form'].is_valid(), True)
1300

    
1301
        # confirm request
1302
        post_url = reverse('project_add') + '?verify=0&edit=0'
1303
        r = self.user_client.post(post_url, data=application_data, follow=True)
1304
        self.assertContains(r, "The project application has been received")
1305
        self.assertRedirects(r, reverse('project_list'))
1306
        self.assertEqual(ProjectApplication.objects.count(), 1)
1307
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1308

    
1309
        # create another one
1310
        application_data['name'] = 'project2.synnefo.org'
1311
        r = self.user_client.post(post_url, data=application_data, follow=True)
1312
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1313

    
1314
        # no more applications (LIMIT is 2)
1315
        r = self.user_client.get(reverse('project_add'), follow=True)
1316
        self.assertRedirects(r, reverse('project_list'))
1317
        self.assertContains(r, "You are not allowed to create a new project")
1318

    
1319
        # login
1320
        self.admin_client.get(reverse("edit_profile"))
1321
        # admin approves
1322
        r = self.admin_client.post(reverse('project_app_approve',
1323
                                           kwargs={'application_id': app1_id}),
1324
                                   follow=True)
1325
        self.assertEqual(r.status_code, 200)
1326

    
1327
        # project created
1328
        self.assertEqual(Project.objects.count(), 1)
1329

    
1330
        # login
1331
        self.member_client.get(reverse("edit_profile"))
1332
        # cannot join app2 (not approved yet)
1333
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1334
        r = self.member_client.post(join_url, follow=True)
1335
        self.assertEqual(r.status_code, 403)
1336

    
1337
        # can join app1
1338
        self.member_client.get(reverse("edit_profile"))
1339
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1340
        r = self.member_client.post(join_url, follow=True)
1341
        self.assertEqual(r.status_code, 200)
1342

    
1343
        memberships = ProjectMembership.objects.all()
1344
        self.assertEqual(len(memberships), 1)
1345
        memb_id = memberships[0].id
1346

    
1347
        reject_member_url = reverse('project_reject_member',
1348
                                    kwargs={'chain_id': app1_id, 'memb_id':
1349
                                            memb_id})
1350
        accept_member_url = reverse('project_accept_member',
1351
                                    kwargs={'chain_id': app1_id, 'memb_id':
1352
                                            memb_id})
1353

    
1354
        # only project owner is allowed to reject
1355
        r = self.member_client.post(reject_member_url, follow=True)
1356
        self.assertContains(r, "You do not have the permissions")
1357
        self.assertEqual(r.status_code, 200)
1358

    
1359
        # user (owns project) rejects membership
1360
        r = self.user_client.post(reject_member_url, follow=True)
1361
        self.assertEqual(ProjectMembership.objects.count(), 0)
1362

    
1363
        # user rejoins
1364
        self.member_client.get(reverse("edit_profile"))
1365
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1366
        r = self.member_client.post(join_url, follow=True)
1367
        self.assertEqual(r.status_code, 200)
1368
        self.assertEqual(ProjectMembership.objects.count(), 1)
1369

    
1370
        # user (owns project) accepts membership
1371
        r = self.user_client.post(accept_member_url, follow=True)
1372
        self.assertEqual(ProjectMembership.objects.count(), 1)
1373
        membership = ProjectMembership.objects.get()
1374
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1375

    
1376
        user_quotas = quotas.get_users_quotas([self.member])
1377
        resource = 'service1.resource'
1378
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1379
        # 100 from initial uplimit + 100 from project
1380
        self.assertEqual(newlimit, 200)
1381

    
1382
        remove_member_url = reverse('project_remove_member',
1383
                                    kwargs={'chain_id': app1_id, 'memb_id':
1384
                                            membership.id})
1385
        r = self.user_client.post(remove_member_url, follow=True)
1386
        self.assertEqual(r.status_code, 200)
1387

    
1388
        user_quotas = quotas.get_users_quotas([self.member])
1389
        resource = 'service1.resource'
1390
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1391
        # 200 - 100 from project
1392
        self.assertEqual(newlimit, 100)
1393

    
1394

    
1395
ROOT = '/astakos/api/'
1396
u = lambda url: ROOT + url
1397

    
1398

    
1399
class QuotaAPITest(TestCase):
1400
    def test_0(self):
1401
        client = Client()
1402
        # custom service resources
1403
        service1 = Service.objects.create(
1404
            name="service1", api_url="http://service1.api")
1405
        resource11 = {"name": "service1.resource11",
1406
                      "desc": "resource11 desc",
1407
                      "allow_in_projects": True}
1408
        r, _ = resources.add_resource(service1, resource11)
1409
        resources.update_resource(r, 100)
1410
        resource12 = {"name": "service1.resource12",
1411
                      "desc": "resource11 desc",
1412
                      "unit": "bytes"}
1413
        r, _ = resources.add_resource(service1, resource12)
1414
        resources.update_resource(r, 1024)
1415

    
1416
        # create user
1417
        user = get_local_user('test@grnet.gr')
1418
        quotas.qh_sync_user(user)
1419

    
1420
        # create another service
1421
        service2 = Service.objects.create(
1422
            name="service2", api_url="http://service2.api")
1423
        resource21 = {"name": "service2.resource21",
1424
                      "desc": "resource11 desc",
1425
                      "allow_in_projects": False}
1426
        r, _ = resources.add_resource(service2, resource21)
1427
        resources.update_resource(r, 3)
1428

    
1429
        resource_names = [r['name'] for r in
1430
                          [resource11, resource12, resource21]]
1431

    
1432
        # get resources
1433
        r = client.get(u('resources'), follow=True)
1434
        self.assertEqual(r.status_code, 200)
1435
        body = json.loads(r.content)
1436
        for name in resource_names:
1437
            self.assertIn(name, body)
1438

    
1439
        # get quota
1440
        r = client.get(u('quotas'), follow=True)
1441
        self.assertEqual(r.status_code, 401)
1442

    
1443
        headers = {'HTTP_X_AUTH_TOKEN': user.auth_token}
1444
        r = client.get(u('quotas'), follow=True, **headers)
1445
        self.assertEqual(r.status_code, 200)
1446
        body = json.loads(r.content)
1447
        system_quota = body['system']
1448
        self.assertIn('system', body)
1449
        for name in resource_names:
1450
            self.assertIn(name, system_quota)
1451

    
1452
        r = client.get(u('service_quotas'), follow=True)
1453
        self.assertEqual(r.status_code, 401)
1454

    
1455
        s1_headers = {'HTTP_X_AUTH_TOKEN': service1.auth_token}
1456
        r = client.get(u('service_quotas'), follow=True, **s1_headers)
1457
        self.assertEqual(r.status_code, 200)
1458
        body = json.loads(r.content)
1459
        self.assertIn(user.uuid, body)
1460

    
1461
        r = client.get(u('commissions'), follow=True, **s1_headers)
1462
        self.assertEqual(r.status_code, 200)
1463
        body = json.loads(r.content)
1464
        self.assertEqual(body, [])
1465

    
1466
        # issue some commissions
1467
        commission_request = {
1468
            "force": False,
1469
            "auto_accept": False,
1470
            "name": "my commission",
1471
            "provisions": [
1472
                {
1473
                    "holder": user.uuid,
1474
                    "source": "system",
1475
                    "resource": resource11['name'],
1476
                    "quantity": 1
1477
                },
1478
                {
1479
                    "holder": user.uuid,
1480
                    "source": "system",
1481
                    "resource": resource12['name'],
1482
                    "quantity": 30000
1483
                }]}
1484

    
1485
        post_data = json.dumps(commission_request)
1486
        r = client.post(u('commissions'), post_data,
1487
                        content_type='application/json', **s1_headers)
1488
        self.assertEqual(r.status_code, 413)
1489

    
1490
        commission_request = {
1491
            "force": False,
1492
            "auto_accept": False,
1493
            "name": "my commission",
1494
            "provisions": [
1495
                {
1496
                    "holder": user.uuid,
1497
                    "source": "system",
1498
                    "resource": resource11['name'],
1499
                    "quantity": 1
1500
                },
1501
                {
1502
                    "holder": user.uuid,
1503
                    "source": "system",
1504
                    "resource": resource12['name'],
1505
                    "quantity": 100
1506
                }]}
1507

    
1508
        post_data = json.dumps(commission_request)
1509
        r = client.post(u('commissions'), post_data,
1510
                        content_type='application/json', **s1_headers)
1511
        self.assertEqual(r.status_code, 201)
1512
        body = json.loads(r.content)
1513
        serial = body['serial']
1514
        self.assertEqual(serial, 1)
1515

    
1516
        post_data = json.dumps(commission_request)
1517
        r = client.post(u('commissions'), post_data,
1518
                        content_type='application/json', **s1_headers)
1519
        self.assertEqual(r.status_code, 201)
1520
        body = json.loads(r.content)
1521
        self.assertEqual(body['serial'], 2)
1522

    
1523
        post_data = json.dumps(commission_request)
1524
        r = client.post(u('commissions'), post_data,
1525
                        content_type='application/json', **s1_headers)
1526
        self.assertEqual(r.status_code, 201)
1527
        body = json.loads(r.content)
1528
        self.assertEqual(body['serial'], 3)
1529

    
1530
        r = client.get(u('commissions'), follow=True, **s1_headers)
1531
        self.assertEqual(r.status_code, 200)
1532
        body = json.loads(r.content)
1533
        self.assertEqual(body, [1, 2, 3])
1534

    
1535
        r = client.get(u('commissions/' + str(serial)), follow=True,
1536
                       **s1_headers)
1537
        self.assertEqual(r.status_code, 200)
1538
        body = json.loads(r.content)
1539
        self.assertEqual(body['serial'], serial)
1540
        self.assertIn('issue_time', body)
1541
        self.assertEqual(body['provisions'], commission_request['provisions'])
1542
        self.assertEqual(body['name'], commission_request['name'])
1543

    
1544
        r = client.get(u('service_quotas?user=' + user.uuid),
1545
                       follow=True, **s1_headers)
1546
        self.assertEqual(r.status_code, 200)
1547
        body = json.loads(r.content)
1548
        user_quota = body[user.uuid]
1549
        system_quota = user_quota['system']
1550
        r11 = system_quota[resource11['name']]
1551
        self.assertEqual(r11['usage'], 3)
1552
        self.assertEqual(r11['pending'], 3)
1553

    
1554
        # resolve pending commissions
1555
        resolve_data = {
1556
            "accept": [1, 3],
1557
            "reject": [2, 3, 4],
1558
        }
1559
        post_data = json.dumps(resolve_data)
1560

    
1561
        r = client.post(u('commissions/action'), post_data,
1562
                        content_type='application/json', **s1_headers)
1563
        self.assertEqual(r.status_code, 200)
1564
        body = json.loads(r.content)
1565
        self.assertEqual(body['accepted'], [1])
1566
        self.assertEqual(body['rejected'], [2])
1567
        failed = body['failed']
1568
        self.assertEqual(len(failed), 2)
1569

    
1570
        r = client.get(u('commissions/' + str(serial)), follow=True,
1571
                       **s1_headers)
1572
        self.assertEqual(r.status_code, 404)
1573

    
1574
        # auto accept
1575
        commission_request = {
1576
            "auto_accept": True,
1577
            "name": "my commission",
1578
            "provisions": [
1579
                {
1580
                    "holder": user.uuid,
1581
                    "source": "system",
1582
                    "resource": resource11['name'],
1583
                    "quantity": 1
1584
                },
1585
                {
1586
                    "holder": user.uuid,
1587
                    "source": "system",
1588
                    "resource": resource12['name'],
1589
                    "quantity": 100
1590
                }]}
1591

    
1592
        post_data = json.dumps(commission_request)
1593
        r = client.post(u('commissions'), post_data,
1594
                        content_type='application/json', **s1_headers)
1595
        self.assertEqual(r.status_code, 201)
1596
        body = json.loads(r.content)
1597
        serial = body['serial']
1598
        self.assertEqual(serial, 4)
1599

    
1600
        r = client.get(u('commissions/' + str(serial)), follow=True,
1601
                       **s1_headers)
1602
        self.assertEqual(r.status_code, 404)
1603

    
1604
        # malformed
1605
        commission_request = {
1606
            "auto_accept": True,
1607
            "name": "my commission",
1608
            "provisions": [
1609
                {
1610
                    "holder": user.uuid,
1611
                    "source": "system",
1612
                    "resource": resource11['name'],
1613
                }
1614
            ]}
1615

    
1616
        post_data = json.dumps(commission_request)
1617
        r = client.post(u('commissions'), post_data,
1618
                        content_type='application/json', **s1_headers)
1619
        self.assertEqual(r.status_code, 400)
1620

    
1621
        commission_request = {
1622
            "auto_accept": True,
1623
            "name": "my commission",
1624
            "provisions": "dummy"}
1625

    
1626
        post_data = json.dumps(commission_request)
1627
        r = client.post(u('commissions'), post_data,
1628
                        content_type='application/json', **s1_headers)
1629
        self.assertEqual(r.status_code, 400)
1630

    
1631
        r = client.post(u('commissions'), commission_request,
1632
                        content_type='application/json', **s1_headers)
1633
        self.assertEqual(r.status_code, 400)
1634

    
1635
        # no holding
1636
        commission_request = {
1637
            "auto_accept": True,
1638
            "name": "my commission",
1639
            "provisions": [
1640
                {
1641
                    "holder": user.uuid,
1642
                    "source": "system",
1643
                    "resource": "non existent",
1644
                    "quantity": 1
1645
                },
1646
                {
1647
                    "holder": user.uuid,
1648
                    "source": "system",
1649
                    "resource": resource12['name'],
1650
                    "quantity": 100
1651
                }]}
1652

    
1653
        post_data = json.dumps(commission_request)
1654
        r = client.post(u('commissions'), post_data,
1655
                        content_type='application/json', **s1_headers)
1656
        self.assertEqual(r.status_code, 404)
1657

    
1658
        # release
1659
        commission_request = {
1660
            "provisions": [
1661
                {
1662
                    "holder": user.uuid,
1663
                    "source": "system",
1664
                    "resource": resource11['name'],
1665
                    "quantity": -1
1666
                }
1667
            ]}
1668

    
1669
        post_data = json.dumps(commission_request)
1670
        r = client.post(u('commissions'), post_data,
1671
                        content_type='application/json', **s1_headers)
1672
        self.assertEqual(r.status_code, 201)
1673
        body = json.loads(r.content)
1674
        serial = body['serial']
1675

    
1676
        accept_data = {'accept': ""}
1677
        post_data = json.dumps(accept_data)
1678
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1679
                        content_type='application/json', **s1_headers)
1680
        self.assertEqual(r.status_code, 200)
1681

    
1682
        reject_data = {'reject': ""}
1683
        post_data = json.dumps(accept_data)
1684
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1685
                        content_type='application/json', **s1_headers)
1686
        self.assertEqual(r.status_code, 404)
1687

    
1688
        # force
1689
        commission_request = {
1690
            "force": True,
1691
            "provisions": [
1692
                {
1693
                    "holder": user.uuid,
1694
                    "source": "system",
1695
                    "resource": resource11['name'],
1696
                    "quantity": 100
1697
                }]}
1698

    
1699
        post_data = json.dumps(commission_request)
1700
        r = client.post(u('commissions'), post_data,
1701
                        content_type='application/json', **s1_headers)
1702
        self.assertEqual(r.status_code, 201)
1703
        body = json.loads(r.content)
1704

    
1705
        r = client.get(u('quotas'), **headers)
1706
        self.assertEqual(r.status_code, 200)
1707
        body = json.loads(r.content)
1708
        system_quota = body['system']
1709
        r11 = system_quota[resource11['name']]
1710
        self.assertEqual(r11['usage'], 102)
1711
        self.assertEqual(r11['pending'], 101)