Statistics
| Branch: | Tag: | Revision:

root / snf-astakos-app / astakos / im / tests.py @ aba462a2

History | View | Annotate | Download (70.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from contextlib import contextmanager
34

    
35
import copy
36
import datetime
37
import functools
38

    
39
from snf_django.utils.testing import with_settings, override_settings, assertIn
40

    
41
from django.test import TestCase, Client
42
from django.core import mail
43
from django.http import SimpleCookie, HttpRequest, QueryDict
44
from django.utils.importlib import import_module
45
from django.utils import simplejson as json
46

    
47
from astakos.im.activation_backends import *
48
from astakos.im.target.shibboleth import Tokens as ShibbolethTokens
49
from astakos.im.models import *
50
from astakos.im import functions
51
from astakos.im import settings as astakos_settings
52
from astakos.im import forms
53

    
54
from urllib import quote
55
from datetime import timedelta
56

    
57
from astakos.im import messages
58
from astakos.im import auth_providers
59
from astakos.im import quotas
60
from astakos.im import resources
61

    
62
from django.conf import settings
63

    
64

    
65
# set some common settings
66
astakos_settings.EMAILCHANGE_ENABLED = True
67
astakos_settings.RECAPTCHA_ENABLED = False
68

    
69
settings.LOGGING_SETUP['disable_existing_loggers'] = False
70

    
71
# shortcut decorators to override provider settings
72
# e.g. shibboleth_settings(ENABLED=True) will set
73
# ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_ENABLED = True in global synnefo settings
74
prefixes = {'providers': 'AUTH_PROVIDER_',
75
            'shibboleth': 'ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_',
76
            'local': 'ASTAKOS_AUTH_PROVIDER_LOCAL_'}
77
im_settings = functools.partial(with_settings, astakos_settings)
78
shibboleth_settings = functools.partial(with_settings,
79
                                        settings,
80
                                        prefix=prefixes['shibboleth'])
81
localauth_settings = functools.partial(with_settings, settings,
82
                                       prefix=prefixes['local'])
83

    
84

    
85
class AstakosTestClient(Client):
86
    pass
87

    
88

    
89
class ShibbolethClient(AstakosTestClient):
90
    """
91
    A shibboleth agnostic client.
92
    """
93
    VALID_TOKENS = filter(lambda x: not x.startswith("_"),
94
                          dir(ShibbolethTokens))
95

    
96
    def __init__(self, *args, **kwargs):
97
        self.tokens = kwargs.pop('tokens', {})
98
        super(ShibbolethClient, self).__init__(*args, **kwargs)
99

    
100
    def set_tokens(self, **kwargs):
101
        for key, value in kwargs.iteritems():
102
            key = 'SHIB_%s' % key.upper()
103
            if not key in self.VALID_TOKENS:
104
                raise Exception('Invalid shibboleth token')
105

    
106
            self.tokens[key] = value
107

    
108
    def unset_tokens(self, *keys):
109
        for key in keys:
110
            key = 'SHIB_%s' % param.upper()
111
            if key in self.tokens:
112
                del self.tokens[key]
113

    
114
    def reset_tokens(self):
115
        self.tokens = {}
116

    
117
    def get_http_token(self, key):
118
        http_header = getattr(ShibbolethTokens, key)
119
        return http_header
120

    
121
    def request(self, **request):
122
        """
123
        Transform valid shibboleth tokens to http headers
124
        """
125
        for token, value in self.tokens.iteritems():
126
            request[self.get_http_token(token)] = value
127

    
128
        for param in request.keys():
129
            key = 'SHIB_%s' % param.upper()
130
            if key in self.VALID_TOKENS:
131
                request[self.get_http_token(key)] = request[param]
132
                del request[param]
133

    
134
        return super(ShibbolethClient, self).request(**request)
135

    
136

    
137
def get_user_client(username, password="password"):
138
    client = Client()
139
    client.login(username=username, password=password)
140
    return client
141

    
142

    
143
def get_local_user(username, **kwargs):
144
        try:
145
            return AstakosUser.objects.get(email=username)
146
        except:
147
            user_params = {
148
                'username': username,
149
                'email': username,
150
                'is_active': True,
151
                'activation_sent': datetime.now(),
152
                'email_verified': True,
153
                'provider': 'local'
154
            }
155
            user_params.update(kwargs)
156
            user = AstakosUser(**user_params)
157
            user.set_password(kwargs.get('password', 'password'))
158
            user.save()
159
            user.add_auth_provider('local', auth_backend='astakos')
160
            if kwargs.get('is_active', True):
161
                user.is_active = True
162
            else:
163
                user.is_active = False
164
            user.save()
165
            return user
166

    
167

    
168
def get_mailbox(email):
169
    mails = []
170
    for sent_email in mail.outbox:
171
        for recipient in sent_email.recipients():
172
            if email in recipient:
173
                mails.append(sent_email)
174
    return mails
175

    
176

    
177
class ShibbolethTests(TestCase):
178
    """
179
    Testing shibboleth authentication.
180
    """
181

    
182
    fixtures = ['groups']
183

    
184
    def setUp(self):
185
        self.client = ShibbolethClient()
186
        astakos_settings.IM_MODULES = ['local', 'shibboleth']
187
        astakos_settings.MODERATION_ENABLED = True
188

    
189
    @im_settings(FORCE_PROFILE_UPDATE=False)
190
    def test_create_account(self):
191

    
192
        client = ShibbolethClient()
193

    
194
        # shibboleth views validation
195
        # eepn required
196
        r = client.get('/im/login/shibboleth?', follow=True)
197
        self.assertContains(r, messages.SHIBBOLETH_MISSING_EPPN % {
198
            'domain': astakos_settings.BASEURL,
199
            'contact_email': settings.CONTACT_EMAIL
200
        })
201
        client.set_tokens(eppn="kpapeppn")
202

    
203
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = True
204
        # shibboleth user info required
205
        r = client.get('/im/login/shibboleth?', follow=True)
206
        self.assertContains(r, messages.SHIBBOLETH_MISSING_NAME)
207
        astakos_settings.SHIBBOLETH_REQUIRE_NAME_INFO = False
208

    
209
        # shibboleth logged us in
210
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
211
                          cn="Kostas Papadimitriou",
212
                          ep_affiliation="Test Affiliation")
213
        r = client.get('/im/login/shibboleth?', follow=True)
214
        token = PendingThirdPartyUser.objects.get().token
215
        self.assertRedirects(r, '/im/signup?third_party_token=%s' % token)
216
        self.assertEqual(r.status_code, 200)
217

    
218
        # a new pending user created
219
        pending_user = PendingThirdPartyUser.objects.get(
220
            third_party_identifier="kpapeppn")
221
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
222
        # keep the token for future use
223
        token = pending_user.token
224
        # from now on no shibboleth headers are sent to the server
225
        client.reset_tokens()
226

    
227
        # this is the old way, it should fail, to avoid pending user take over
228
        r = client.get('/im/shibboleth/signup/%s' % pending_user.username)
229
        self.assertEqual(r.status_code, 404)
230

    
231
        # this is the signup unique url associated with the pending user
232
        # created
233
        r = client.get('/im/signup/?third_party_token=%s' % token)
234
        identifier = pending_user.third_party_identifier
235
        post_data = {'third_party_identifier': identifier,
236
                     'first_name': 'Kostas',
237
                     'third_party_token': token,
238
                     'last_name': 'Mitroglou',
239
                     'provider': 'shibboleth'}
240

    
241
        signup_url = reverse('signup')
242

    
243
        # invlid email
244
        post_data['email'] = 'kpap'
245
        r = client.post(signup_url, post_data)
246
        self.assertContains(r, token)
247

    
248
        # existing email
249
        existing_user = get_local_user('test@test.com')
250
        post_data['email'] = 'test@test.com'
251
        r = client.post(signup_url, post_data)
252
        self.assertContains(r, messages.EMAIL_USED)
253
        existing_user.delete()
254

    
255
        # and finally a valid signup
256
        post_data['email'] = 'kpap@grnet.gr'
257
        r = client.post(signup_url, post_data, follow=True)
258
        self.assertContains(r, messages.NOTIFICATION_SENT)
259

    
260
        # everything is ok in our db
261
        self.assertEqual(AstakosUser.objects.count(), 1)
262
        self.assertEqual(AstakosUserAuthProvider.objects.count(), 1)
263
        self.assertEqual(PendingThirdPartyUser.objects.count(), 0)
264

    
265
        # provider info stored
266
        provider = AstakosUserAuthProvider.objects.get(module="shibboleth")
267
        self.assertEqual(provider.affiliation, 'Test Affiliation')
268
        self.assertEqual(provider.info, {u'email': u'kpap@grnet.gr',
269
                                         u'eppn': u'kpapeppn',
270
                                         u'name': u'Kostas Papadimitriou'})
271

    
272
        # lets login (not activated yet)
273
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppn",
274
                          cn="Kostas Papadimitriou", )
275
        r = client.get("/im/login/shibboleth?", follow=True)
276
        self.assertContains(r, 'is pending moderation')
277

    
278
        # admin activates our user
279
        u = AstakosUser.objects.get(username="kpap@grnet.gr")
280
        functions.activate(u)
281
        self.assertEqual(u.is_active, True)
282

    
283
        # we see our profile
284
        r = client.get("/im/login/shibboleth?", follow=True)
285
        self.assertRedirects(r, '/im/landing')
286
        self.assertEqual(r.status_code, 200)
287

    
288
    def test_existing(self):
289
        """
290
        Test adding of third party login to an existing account
291
        """
292

    
293
        # this is our existing user
294
        existing_user = get_local_user('kpap@grnet.gr')
295
        existing_inactive = get_local_user('kpap-inactive@grnet.gr')
296
        existing_inactive.is_active = False
297
        existing_inactive.save()
298

    
299
        existing_unverified = get_local_user('kpap-unverified@grnet.gr')
300
        existing_unverified.is_active = False
301
        existing_unverified.activation_sent = None
302
        existing_unverified.email_verified = False
303
        existing_unverified.is_verified = False
304
        existing_unverified.save()
305

    
306
        client = ShibbolethClient()
307
        # shibboleth logged us in, notice that we use different email
308
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
309
                          cn="Kostas Papadimitriou", )
310
        r = client.get("/im/login/shibboleth?", follow=True)
311

    
312
        # a new pending user created
313
        pending_user = PendingThirdPartyUser.objects.get()
314
        token = pending_user.token
315
        self.assertEqual(PendingThirdPartyUser.objects.count(), 1)
316
        pending_key = pending_user.token
317
        client.reset_tokens()
318
        self.assertRedirects(r, "/im/signup?third_party_token=%s" % token)
319

    
320
        form = r.context['login_form']
321
        signupdata = copy.copy(form.initial)
322
        signupdata['email'] = 'kpap@grnet.gr'
323
        signupdata['third_party_token'] = token
324
        signupdata['provider'] = 'shibboleth'
325
        signupdata.pop('id', None)
326

    
327
        # the email exists to another user
328
        r = client.post("/im/signup", signupdata)
329
        self.assertContains(r, "There is already an account with this email "
330
                               "address")
331
        # change the case, still cannot create
332
        signupdata['email'] = 'KPAP@grnet.GR'
333
        r = client.post("/im/signup", signupdata)
334
        self.assertContains(r, "There is already an account with this email "
335
                               "address")
336
        # inactive user
337
        signupdata['email'] = 'KPAP-inactive@grnet.GR'
338
        r = client.post("/im/signup", signupdata)
339
        self.assertContains(r, "There is already an account with this email "
340
                               "address")
341

    
342
        # unverified user, this should pass, old entry will be deleted
343
        signupdata['email'] = 'KAPAP-unverified@grnet.GR'
344
        r = client.post("/im/signup", signupdata)
345

    
346
        post_data = {'password': 'password',
347
                     'username': 'kpap@grnet.gr'}
348
        r = client.post('/im/local', post_data, follow=True)
349
        self.assertTrue(r.context['request'].user.is_authenticated())
350
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
351
                          cn="Kostas Papadimitriou", )
352
        r = client.get("/im/login/shibboleth?", follow=True)
353
        self.assertContains(r, "enabled for this account")
354
        client.reset_tokens()
355

    
356
        user = existing_user
357
        self.assertTrue(user.has_auth_provider('shibboleth'))
358
        self.assertTrue(user.has_auth_provider('local',
359
                                               auth_backend='astakos'))
360
        client.logout()
361

    
362
        # look Ma, i can login with both my shibboleth and local account
363
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
364
                          cn="Kostas Papadimitriou")
365
        r = client.get("/im/login/shibboleth?", follow=True)
366
        self.assertTrue(r.context['request'].user.is_authenticated())
367
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
368
        self.assertRedirects(r, '/im/landing')
369
        self.assertEqual(r.status_code, 200)
370
        client.logout()
371
        client.reset_tokens()
372

    
373
        # logged out
374
        r = client.get("/im/profile", follow=True)
375
        self.assertFalse(r.context['request'].user.is_authenticated())
376

    
377
        # login with local account also works
378
        post_data = {'password': 'password',
379
                     'username': 'kpap@grnet.gr'}
380
        r = self.client.post('/im/local', post_data, follow=True)
381
        self.assertTrue(r.context['request'].user.is_authenticated())
382
        self.assertTrue(r.context['request'].user.email == "kpap@grnet.gr")
383
        self.assertRedirects(r, '/im/landing')
384
        self.assertEqual(r.status_code, 200)
385

    
386
        # cannot add the same eppn
387
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn",
388
                          cn="Kostas Papadimitriou", )
389
        r = client.get("/im/login/shibboleth?", follow=True)
390
        self.assertRedirects(r, '/im/landing')
391
        self.assertTrue(r.status_code, 200)
392
        self.assertEquals(existing_user.auth_providers.count(), 2)
393

    
394
        # only one allowed by default
395
        client.set_tokens(mail="secondary@shibboleth.gr", eppn="kpapeppn2",
396
                          cn="Kostas Papadimitriou", ep_affiliation="affil2")
397
        prov = auth_providers.get_provider('shibboleth')
398
        r = client.get("/im/login/shibboleth?", follow=True)
399
        self.assertContains(r, "Failed to add")
400
        self.assertRedirects(r, '/im/profile')
401
        self.assertTrue(r.status_code, 200)
402
        self.assertEquals(existing_user.auth_providers.count(), 2)
403
        client.logout()
404
        client.reset_tokens()
405

    
406
        # cannot login with another eppn
407
        client.set_tokens(mail="kpap@grnet.gr", eppn="kpapeppninvalid",
408
                          cn="Kostas Papadimitriou")
409
        r = client.get("/im/login/shibboleth?", follow=True)
410
        self.assertFalse(r.context['request'].user.is_authenticated())
411

    
412
        # cannot
413

    
414
        # lets remove local password
415
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
416
                                       email="kpap@grnet.gr")
417
        remove_local_url = user.get_auth_provider('local').get_remove_url
418
        remove_shibbo_url = user.get_auth_provider('shibboleth',
419
                                                   'kpapeppn').get_remove_url
420
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
421
                          cn="Kostas Papadimtriou")
422
        r = client.get("/im/login/shibboleth?", follow=True)
423
        client.reset_tokens()
424

    
425
        # TODO: this view should use POST
426
        r = client.get(remove_local_url)
427
        # 2 providers left
428
        self.assertEqual(user.auth_providers.count(), 1)
429
        # cannot remove last provider
430
        r = client.get(remove_shibbo_url)
431
        self.assertEqual(r.status_code, 403)
432
        self.client.logout()
433

    
434
        # cannot login using local credentials (notice we use another client)
435
        post_data = {'password': 'password',
436
                     'username': 'kpap@grnet.gr'}
437
        r = self.client.post('/im/local', post_data, follow=True)
438
        self.assertFalse(r.context['request'].user.is_authenticated())
439

    
440
        # we can reenable the local provider by setting a password
441
        r = client.get("/im/password_change", follow=True)
442
        r = client.post("/im/password_change", {'new_password1': '111',
443
                                                'new_password2': '111'},
444
                        follow=True)
445
        user = r.context['request'].user
446
        self.assertTrue(user.has_auth_provider('local'))
447
        self.assertTrue(user.has_auth_provider('shibboleth'))
448
        self.assertTrue(user.check_password('111'))
449
        self.assertTrue(user.has_usable_password())
450
        self.client.logout()
451

    
452
        # now we can login
453
        post_data = {'password': '111',
454
                     'username': 'kpap@grnet.gr'}
455
        r = self.client.post('/im/local', post_data, follow=True)
456
        self.assertTrue(r.context['request'].user.is_authenticated())
457

    
458
        client.reset_tokens()
459

    
460
        # we cannot take over another shibboleth identifier
461
        user2 = get_local_user('another@grnet.gr')
462
        user2.add_auth_provider('shibboleth', identifier='existingeppn')
463
        # login
464
        client.set_tokens(mail="kpap@shibboleth.gr", eppn="kpapeppn",
465
                          cn="Kostas Papadimitriou")
466
        r = client.get("/im/login/shibboleth?", follow=True)
467
        # try to assign existing shibboleth identifier of another user
468
        client.set_tokens(mail="kpap_second@shibboleth.gr",
469
                          eppn="existingeppn", cn="Kostas Papadimitriou")
470
        r = client.get("/im/login/shibboleth?", follow=True)
471
        self.assertContains(r, "this account is already assigned")
472

    
473

    
474
class TestLocal(TestCase):
475

    
476
    fixtures = ['groups']
477

    
478
    def setUp(self):
479
        settings.ADMINS = (('admin', 'support@cloud.grnet.gr'),)
480
        settings.SERVER_EMAIL = 'no-reply@grnet.gr'
481
        self._orig_moderation = astakos_settings.MODERATION_ENABLED
482
        settings.ASTAKOS_MODERATION_ENABLED = True
483

    
484
    def tearDown(self):
485
        settings.ASTAKOS_MODERATION_ENABLED = self._orig_moderation
486

    
487
    def test_no_moderation(self):
488
        # disable moderation
489
        astakos_settings.MODERATION_ENABLED = False
490

    
491
        # create a new user
492
        r = self.client.get("/im/signup")
493
        self.assertEqual(r.status_code, 200)
494
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
495
                'password2': 'password', 'first_name': 'Kostas',
496
                'last_name': 'Mitroglou', 'provider': 'local'}
497
        r = self.client.post("/im/signup", data)
498

    
499
        # user created
500
        self.assertEqual(AstakosUser.objects.count(), 1)
501
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
502
                                       email="kpap@grnet.gr")
503
        self.assertEqual(user.username, 'kpap@grnet.gr')
504
        self.assertEqual(user.has_auth_provider('local'), True)
505
        self.assertFalse(user.is_active)
506

    
507
        # user (but not admin) gets notified
508
        self.assertEqual(len(get_mailbox('support@cloud.grnet.gr')), 0)
509
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
510
        astakos_settings.MODERATION_ENABLED = True
511

    
512
    def test_email_case(self):
513
        data = {
514
            'email': 'kPap@grnet.gr',
515
            'password1': '1234',
516
            'password2': '1234'
517
        }
518

    
519
        form = forms.LocalUserCreationForm(data)
520
        self.assertTrue(form.is_valid())
521
        user = form.save()
522
        form.store_user(user, {})
523

    
524
        u = AstakosUser.objects.get()
525
        self.assertEqual(u.email, 'kPap@grnet.gr')
526
        self.assertEqual(u.username, 'kpap@grnet.gr')
527
        u.is_active = True
528
        u.email_verified = True
529
        u.save()
530

    
531
        data = {'username': 'kpap@grnet.gr', 'password': '1234'}
532
        login = forms.LoginForm(data=data)
533
        self.assertTrue(login.is_valid())
534

    
535
        data = {'username': 'KpaP@grnet.gr', 'password': '1234'}
536
        login = forms.LoginForm(data=data)
537
        self.assertTrue(login.is_valid())
538

    
539
        data = {
540
            'email': 'kpap@grnet.gr',
541
            'password1': '1234',
542
            'password2': '1234'
543
        }
544
        form = forms.LocalUserCreationForm(data)
545
        self.assertFalse(form.is_valid())
546

    
547
    @im_settings(HELPDESK=(('support', 'support@synnefo.org'),))
548
    @im_settings(FORCE_PROFILE_UPDATE=False)
549
    def test_local_provider(self):
550
        self.helpdesk_email = astakos_settings.HELPDESK[0][1]
551
        # enable moderation
552
        astakos_settings.MODERATION_ENABLED = True
553

    
554
        # create a user
555
        r = self.client.get("/im/signup")
556
        self.assertEqual(r.status_code, 200)
557
        data = {'email': 'kpap@grnet.gr', 'password1': 'password',
558
                'password2': 'password', 'first_name': 'Kostas',
559
                'last_name': 'Mitroglou', 'provider': 'local'}
560
        r = self.client.post("/im/signup", data)
561

    
562
        # user created
563
        self.assertEqual(AstakosUser.objects.count(), 1)
564
        user = AstakosUser.objects.get(username="kpap@grnet.gr",
565
                                       email="kpap@grnet.gr")
566
        self.assertEqual(user.username, 'kpap@grnet.gr')
567
        self.assertEqual(user.has_auth_provider('local'), True)
568
        self.assertFalse(user.is_active)  # not activated
569
        self.assertFalse(user.email_verified)  # not verified
570
        self.assertFalse(user.activation_sent)  # activation automatically sent
571

    
572
        # admin gets notified and activates the user from the command line
573
        self.assertEqual(len(get_mailbox(self.helpdesk_email)), 1)
574
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
575
                                           'password': 'password'})
576
        self.assertContains(r, messages.NOTIFICATION_SENT)
577
        functions.send_activation(user)
578

    
579
        # user activation fields updated and user gets notified via email
580
        user = AstakosUser.objects.get(pk=user.pk)
581
        self.assertTrue(user.activation_sent)
582
        self.assertFalse(user.email_verified)
583
        self.assertFalse(user.is_active)
584
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 1)
585

    
586
        # user forgot she got registered and tries to submit registration
587
        # form. Notice the upper case in email
588
        data = {'email': 'KPAP@grnet.gr', 'password1': 'password',
589
                'password2': 'password', 'first_name': 'Kostas',
590
                'last_name': 'Mitroglou', 'provider': 'local'}
591
        r = self.client.post("/im/signup", data, follow=True)
592
        self.assertRedirects(r, reverse('index'))
593
        self.assertContains(r, messages.NOTIFICATION_SENT)
594

    
595
        user = AstakosUser.objects.get()
596
        functions.send_activation(user)
597

    
598
        # previous user replaced
599
        self.assertTrue(user.activation_sent)
600
        self.assertFalse(user.email_verified)
601
        self.assertFalse(user.is_active)
602
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 1)
603

    
604
        # hmmm, email exists; lets request a password change
605
        r = self.client.get('/im/local/password_reset')
606
        self.assertEqual(r.status_code, 200)
607
        data = {'email': 'kpap@grnet.gr'}
608
        r = self.client.post('/im/local/password_reset', data, follow=True)
609
        # she can't because account is not active yet
610
        self.assertContains(r, 'pending activation')
611

    
612
        # moderation is enabled and an activation email has already been sent
613
        # so user can trigger resend of the activation email
614
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
615
        self.assertContains(r, 'has been sent to your email address.')
616
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 2)
617

    
618
        # also she cannot login
619
        data = {'username': 'kpap@grnet.gr', 'password': 'password'}
620
        r = self.client.post('/im/local', data, follow=True)
621
        self.assertContains(r, 'Resend activation')
622
        self.assertFalse(r.context['request'].user.is_authenticated())
623
        self.assertFalse('_pithos2_a' in self.client.cookies)
624

    
625
        # user sees the message and resends activation
626
        r = self.client.get('/im/send/activation/%d' % user.pk, follow=True)
627
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 3)
628

    
629
        # switch back moderation setting
630
        astakos_settings.MODERATION_ENABLED = True
631
        r = self.client.get(user.get_activation_url(), follow=True)
632
        self.assertRedirects(r, "/im/landing")
633
        r = self.client.get('/im/profile', follow=True)
634
        self.assertTrue(r.context['request'].user.is_authenticated())
635
        self.assertTrue('_pithos2_a' in self.client.cookies)
636
        self.assertContains(r, "KPAP@grnet.gr")
637
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 4)
638

    
639
        user = AstakosUser.objects.get(pk=user.pk)
640
        # user activated and logged in, token cookie set
641
        self.assertTrue(r.context['request'].user.is_authenticated())
642
        self.assertTrue('_pithos2_a' in self.client.cookies)
643
        cookies = self.client.cookies
644
        self.assertTrue(quote(user.auth_token) in
645
                        cookies.get('_pithos2_a').value)
646
        r = self.client.get('/im/logout', follow=True)
647
        r = self.client.get('/im/')
648
        # user logged out, token cookie removed
649
        self.assertFalse(r.context['request'].user.is_authenticated())
650
        self.assertFalse(self.client.cookies.get('_pithos2_a').value)
651

    
652
        #https://docs.djangoproject.com/en/dev/topics/testing/#persistent-state
653
        del self.client.cookies['_pithos2_a']
654

    
655
        # user can login
656
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
657
                                           'password': 'password'},
658
                             follow=True)
659
        self.assertTrue(r.context['request'].user.is_authenticated())
660
        self.assertTrue('_pithos2_a' in self.client.cookies)
661
        cookies = self.client.cookies
662
        self.assertTrue(quote(user.auth_token) in
663
                        cookies.get('_pithos2_a').value)
664
        self.client.get('/im/logout', follow=True)
665

    
666
        # user forgot password
667
        old_pass = user.password
668
        r = self.client.get('/im/local/password_reset')
669
        self.assertEqual(r.status_code, 200)
670
        r = self.client.post('/im/local/password_reset', {'email':
671
                                                          'kpap@grnet.gr'})
672
        self.assertEqual(r.status_code, 302)
673
        # email sent
674
        self.assertEqual(len(get_mailbox('KPAP@grnet.gr')), 5)
675

    
676
        # user visits change password link
677
        # "Refresh" user because created url is based on last_login timestamp
678
        user = AstakosUser.objects.get(pk=user.pk)
679
        r = self.client.get(user.get_password_reset_url())
680
        r = self.client.post(user.get_password_reset_url(),
681
                             {'new_password1': 'newpass',
682
                              'new_password2': 'newpass'})
683

    
684
        user = AstakosUser.objects.get(pk=user.pk)
685
        self.assertNotEqual(old_pass, user.password)
686

    
687
        # old pass is not usable
688
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
689
                                           'password': 'password'})
690
        self.assertContains(r, 'Please enter a correct username and password')
691
        r = self.client.post('/im/local', {'username': 'kpap@grnet.gr',
692
                                           'password': 'newpass'},
693
                             follow=True)
694
        self.assertTrue(r.context['request'].user.is_authenticated())
695
        self.client.logout()
696

    
697
        # tests of special local backends
698
        user = AstakosUser.objects.get(pk=user.pk)
699
        user.auth_providers.filter(module='local').update(auth_backend='ldap')
700
        user.save()
701

    
702
        # non astakos local backends do not support password reset
703
        r = self.client.get('/im/local/password_reset')
704
        self.assertEqual(r.status_code, 200)
705
        r = self.client.post('/im/local/password_reset', {'email':
706
                                                          'kpap@grnet.gr'})
707
        # she can't because account is not active yet
708
        self.assertContains(r, "Changing password is not")
709

    
710

    
711
class UserActionsTests(TestCase):
712

    
713
    def test_email_change(self):
714
        # to test existing email validation
715
        get_local_user('existing@grnet.gr')
716

    
717
        # local user
718
        user = get_local_user('kpap@grnet.gr')
719

    
720
        # login as kpap
721
        self.client.login(username='kpap@grnet.gr', password='password')
722
        r = self.client.get('/im/profile', follow=True)
723
        user = r.context['request'].user
724
        self.assertTrue(user.is_authenticated())
725

    
726
        # change email is enabled
727
        r = self.client.get('/im/email_change')
728
        self.assertEqual(r.status_code, 200)
729
        self.assertFalse(user.email_change_is_pending())
730

    
731
        # request email change to an existing email fails
732
        data = {'new_email_address': 'existing@grnet.gr'}
733
        r = self.client.post('/im/email_change', data)
734
        self.assertContains(r, messages.EMAIL_USED)
735

    
736
        # proper email change
737
        data = {'new_email_address': 'kpap@gmail.com'}
738
        r = self.client.post('/im/email_change', data, follow=True)
739
        self.assertRedirects(r, '/im/profile')
740
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
741
        change1 = EmailChange.objects.get()
742

    
743
        # user sees a warning
744
        r = self.client.get('/im/email_change')
745
        self.assertEqual(r.status_code, 200)
746
        self.assertContains(r, messages.PENDING_EMAIL_CHANGE_REQUEST)
747
        self.assertTrue(user.email_change_is_pending())
748

    
749
        # link was sent
750
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
751
        self.assertEqual(len(get_mailbox('kpap@gmail.com')), 1)
752

    
753
        # proper email change
754
        data = {'new_email_address': 'kpap@yahoo.com'}
755
        r = self.client.post('/im/email_change', data, follow=True)
756
        self.assertRedirects(r, '/im/profile')
757
        self.assertContains(r, messages.EMAIL_CHANGE_REGISTERED)
758
        self.assertEqual(len(get_mailbox('kpap@grnet.gr')), 0)
759
        self.assertEqual(len(get_mailbox('kpap@yahoo.com')), 1)
760
        change2 = EmailChange.objects.get()
761

    
762
        r = self.client.get(change1.get_url())
763
        self.assertEquals(r.status_code, 302)
764
        self.client.logout()
765

    
766
        r = self.client.post('/im/local?next=' + change2.get_url(),
767
                             {'username': 'kpap@grnet.gr',
768
                              'password': 'password',
769
                              'next': change2.get_url()},
770
                             follow=True)
771
        self.assertRedirects(r, '/im/profile')
772
        user = r.context['request'].user
773
        self.assertEquals(user.email, 'kpap@yahoo.com')
774
        self.assertEquals(user.username, 'kpap@yahoo.com')
775

    
776
        self.client.logout()
777
        r = self.client.post('/im/local?next=' + change2.get_url(),
778
                             {'username': 'kpap@grnet.gr',
779
                              'password': 'password',
780
                              'next': change2.get_url()},
781
                             follow=True)
782
        self.assertContains(r, "Please enter a correct username and password")
783
        self.assertEqual(user.emailchanges.count(), 0)
784

    
785

    
786
class TestAuthProviderViews(TestCase):
787

    
788
    @shibboleth_settings(CREATION_GROUPS_POLICY=['academic-login'])
789
    @shibboleth_settings(AUTOMODERATE_POLICY=True)
790
    @im_settings(IM_MODULES=['shibboleth', 'local'])
791
    @im_settings(MODERATION_ENABLED=True)
792
    @im_settings(FORCE_PROFILE_UPDATE=False)
793
    def test_user(self):
794
        Profile = AuthProviderPolicyProfile
795
        Pending = PendingThirdPartyUser
796
        User = AstakosUser
797

    
798
        User.objects.create(email="newuser@grnet.gr")
799
        get_local_user("olduser@grnet.gr")
800
        cl_olduser = ShibbolethClient()
801
        get_local_user("olduser2@grnet.gr")
802
        ShibbolethClient()
803
        cl_newuser = ShibbolethClient()
804
        cl_newuser2 = Client()
805

    
806
        academic_group, created = Group.objects.get_or_create(
807
            name='academic-login')
808
        academic_users = academic_group.user_set
809
        assert created
810
        policy_only_academic = Profile.objects.add_policy('academic_strict',
811
                                                          'shibboleth',
812
                                                          academic_group,
813
                                                          exclusive=True,
814
                                                          login=False,
815
                                                          add=False)
816

    
817

    
818
        # new academic user
819
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
820
        cl_newuser.set_tokens(eppn="newusereppn")
821
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
822
        pending = Pending.objects.get()
823
        identifier = pending.third_party_identifier
824
        signup_data = {'third_party_identifier': identifier,
825
                       'first_name': 'Academic',
826
                       'third_party_token': pending.token,
827
                       'last_name': 'New User',
828
                       'provider': 'shibboleth'}
829
        r = cl_newuser.post('/im/signup', signup_data)
830
        self.assertContains(r, "This field is required", )
831
        signup_data['email'] = 'olduser@grnet.gr'
832
        r = cl_newuser.post('/im/signup', signup_data)
833
        self.assertContains(r, "already an account with this email", )
834
        signup_data['email'] = 'newuser@grnet.gr'
835
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
836
        r = cl_newuser.post('/im/signup', signup_data, follow=True)
837
        self.assertEqual(r.status_code, 404)
838
        newuser = User.objects.get(email="newuser@grnet.gr")
839
        activation_link = newuser.get_activation_url()
840
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
841

    
842
        # new non-academic user
843
        signup_data = {'first_name': 'Non Academic',
844
                       'last_name': 'New User',
845
                       'provider': 'local',
846
                       'password1': 'password',
847
                       'password2': 'password'}
848
        signup_data['email'] = 'olduser@grnet.gr'
849
        r = cl_newuser2.post('/im/signup', signup_data)
850
        self.assertContains(r, 'There is already an account with this '
851
                               'email address')
852
        signup_data['email'] = 'newuser@grnet.gr'
853
        r = cl_newuser2.post('/im/signup/', signup_data)
854
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
855
        r = self.client.get(activation_link, follow=True)
856
        self.assertEqual(r.status_code, 400)
857
        newuser = User.objects.get(email="newuser@grnet.gr")
858
        self.assertFalse(newuser.activation_sent)
859
        r = self.client.get(newuser.get_activation_url(), follow=True)
860
        self.assertContains(r, "pending moderation")
861

    
862
        self.assertFalse(academic_users.filter(email='newuser@grnet.gr'))
863
        r = cl_newuser.get('/im/login/shibboleth?', follow=True)
864
        pending = Pending.objects.get()
865
        identifier = pending.third_party_identifier
866
        signup_data = {'third_party_identifier': identifier,
867
                       'first_name': 'Academic',
868
                       'third_party_token': pending.token,
869
                       'last_name': 'New User',
870
                       'provider': 'shibboleth'}
871
        signup_data['email'] = 'newuser@grnet.gr'
872
        r = cl_newuser.post('/im/signup', signup_data)
873
        newuser = User.objects.get(email="newuser@grnet.gr")
874
        self.assertTrue(newuser.activation_sent)
875
        activation_link = newuser.get_activation_url()
876
        self.assertTrue(academic_users.get(email='newuser@grnet.gr'))
877
        r = cl_newuser.get(newuser.get_activation_url(), follow=True)
878
        self.assertRedirects(r, '/im/landing')
879
        newuser = User.objects.get(email="newuser@grnet.gr")
880
        self.assertEqual(newuser.is_active, True)
881
        self.assertEqual(newuser.email_verified, True)
882
        cl_newuser.logout()
883

    
884
        # cannot reactivate if suspended
885
        newuser.is_active = False
886
        newuser.save()
887
        r = cl_newuser.get(newuser.get_activation_url())
888
        newuser = User.objects.get(email="newuser@grnet.gr")
889
        self.assertFalse(newuser.is_active)
890

    
891
        # release suspension
892
        newuser.is_active = True
893
        newuser.save()
894

    
895
        cl_newuser.get('/im/login/shibboleth?', follow=True)
896
        local = auth.get_provider('local', newuser)
897
        self.assertEqual(local.get_add_policy, False)
898
        self.assertEqual(local.get_login_policy, False)
899
        r = cl_newuser.get(local.get_add_url, follow=True)
900
        self.assertRedirects(r, '/im/profile')
901
        self.assertContains(r, 'disabled for your')
902

    
903
        cl_olduser.login(username='olduser@grnet.gr', password="password")
904
        r = cl_olduser.get('/im/profile', follow=True)
905
        self.assertEqual(r.status_code, 200)
906
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
907
        self.assertContains(r, 'Your request is missing a unique token')
908
        cl_olduser.set_tokens(eppn="newusereppn")
909
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
910
        self.assertContains(r, 'is already assigned to another user')
911
        cl_olduser.set_tokens(eppn="oldusereppn")
912
        r = cl_olduser.get('/im/login/shibboleth?', follow=True)
913
        self.assertContains(r, 'Academic login enabled for this account')
914

    
915
        user = User.objects.get(email="olduser@grnet.gr")
916
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
917
        local_provider = user.get_auth_provider('local')
918
        self.assertEqual(shib_provider.get_remove_policy, True)
919
        self.assertEqual(local_provider.get_remove_policy, True)
920

    
921

    
922
        policy_only_academic = Profile.objects.add_policy('academic_strict2',
923
                                                          'shibboleth',
924
                                                          academic_group,
925
                                                          remove=False)
926
        user.groups.add(academic_group)
927
        shib_provider = user.get_auth_provider('shibboleth', 'oldusereppn')
928
        local_provider = user.get_auth_provider('local')
929
        self.assertEqual(shib_provider.get_remove_policy, False)
930
        self.assertEqual(local_provider.get_remove_policy, True)
931
        self.assertEqual(local_provider.get_login_policy, False)
932

    
933
        cl_olduser.logout()
934
        login_data = {'username': 'olduser@grnet.gr', 'password': 'password'}
935
        r = cl_olduser.post('/im/local', login_data, follow=True)
936
        self.assertContains(r, "href='/im/login/shibboleth'>Academic login")
937

    
938

    
939
class TestAuthProvidersAPI(TestCase):
940
    """
941
    Test auth_providers module API
942
    """
943

    
944
    @im_settings(IM_MODULES=['local', 'shibboleth'])
945
    def test_create(self):
946
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
947
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
948

    
949
        module = 'shibboleth'
950
        identifier = 'SHIB_UUID'
951
        provider_params = {
952
            'affiliation': 'UNIVERSITY',
953
            'info': {'age': 27}
954
        }
955
        provider = auth.get_provider(module, user2, identifier,
956
                                     **provider_params)
957
        provider.add_to_user()
958
        provider = auth.get_provider(module, user, identifier,
959
                                     **provider_params)
960
        provider.add_to_user()
961
        user.email_verified = True
962
        user.save()
963
        self.assertRaises(Exception, provider.add_to_user)
964
        provider = user.get_auth_provider(module, identifier)
965
        self.assertEqual(user.get_auth_provider(
966
            module, identifier)._instance.info.get('age'), 27)
967

    
968
        module = 'local'
969
        identifier = None
970
        provider_params = {'auth_backend': 'ldap', 'info':
971
                          {'office': 'A1'}}
972
        provider = auth.get_provider(module, user, identifier,
973
                                     **provider_params)
974
        provider.add_to_user()
975
        self.assertFalse(provider.get_add_policy)
976
        self.assertRaises(Exception, provider.add_to_user)
977

    
978
        shib = user.get_auth_provider('shibboleth',
979
                                      'SHIB_UUID')
980
        self.assertTrue(shib.get_remove_policy)
981

    
982
        local = user.get_auth_provider('local')
983
        self.assertTrue(local.get_remove_policy)
984

    
985
        local.remove_from_user()
986
        self.assertFalse(shib.get_remove_policy)
987
        self.assertRaises(Exception, shib.remove_from_user)
988

    
989
        provider = user.get_auth_providers()[0]
990
        self.assertRaises(Exception, provider.add_to_user)
991

    
992
    @im_settings(IM_MODULES=['local', 'shibboleth'])
993
    @shibboleth_settings(ADD_GROUPS_POLICY=['group1', 'group2'])
994
    @shibboleth_settings(CREATION_GROUPS_POLICY=['group-create', 'group1',
995
                                                 'group2'])
996
    @localauth_settings(ADD_GROUPS_POLICY=['localgroup'])
997
    @localauth_settings(CREATION_GROUPS_POLICY=['localgroup-create',
998
                                                'group-create'])
999
    def test_add_groups(self):
1000
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
1001
        provider = auth.get_provider('shibboleth', user, 'test123')
1002
        provider.add_to_user()
1003
        user = AstakosUser.objects.get()
1004
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1005
                         sorted([u'group1', u'group2', u'group-create']))
1006

    
1007
        local = auth.get_provider('local', user)
1008
        local.add_to_user()
1009
        provider = user.get_auth_provider('shibboleth')
1010
        self.assertEqual(provider.get_add_groups_policy, ['group1', 'group2'])
1011
        provider.remove_from_user()
1012
        user = AstakosUser.objects.get()
1013
        self.assertEqual(len(user.get_auth_providers()), 1)
1014
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1015
                         sorted([u'group-create', u'localgroup']))
1016

    
1017
        local = user.get_auth_provider('local')
1018
        self.assertRaises(Exception, local.remove_from_user)
1019
        provider = auth.get_provider('shibboleth', user, 'test123')
1020
        provider.add_to_user()
1021
        user = AstakosUser.objects.get()
1022
        self.assertEqual(sorted(user.groups.values_list('name', flat=True)),
1023
                         sorted([u'group-create', u'group1', u'group2',
1024
                                 u'localgroup']))
1025

    
1026
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1027
    def test_policies(self):
1028
        group_old, created = Group.objects.get_or_create(name='olduser')
1029

    
1030
        astakos_settings.MODERATION_ENABLED = True
1031
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATION_GROUPS_POLICY = \
1032
            ['academic-user']
1033
        settings.ASTAKOS_AUTH_PROVIDER_GOOGLE_ADD_GROUPS_POLICY = \
1034
            ['google-user']
1035

    
1036
        user = AstakosUser.objects.create(email="kpap@grnet.gr")
1037
        user.groups.add(group_old)
1038
        user.add_auth_provider('local')
1039

    
1040
        user2 = AstakosUser.objects.create(email="kpap2@grnet.gr")
1041
        user2.add_auth_provider('shibboleth', identifier='shibid')
1042

    
1043
        user3 = AstakosUser.objects.create(email="kpap3@grnet.gr")
1044
        user3.groups.add(group_old)
1045
        user3.add_auth_provider('local')
1046
        user3.add_auth_provider('shibboleth', identifier='1234')
1047

    
1048
        self.assertTrue(user2.groups.get(name='academic-user'))
1049
        self.assertFalse(user2.groups.filter(name='olduser').count())
1050

    
1051
        local = auth_providers.get_provider('local')
1052
        self.assertTrue(local.get_add_policy)
1053

    
1054
        academic_group = Group.objects.get(name='academic-user')
1055
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1056
                                                     academic_group,
1057
                                                     exclusive=True,
1058
                                                     add=False,
1059
                                                     login=False)
1060
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1061
                                                     academic_group,
1062
                                                     exclusive=True,
1063
                                                     login=False,
1064
                                                     add=False)
1065
        # no duplicate entry gets created
1066
        self.assertEqual(academic_group.authpolicy_profiles.count(), 1)
1067

    
1068
        self.assertEqual(user2.authpolicy_profiles.count(), 0)
1069
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1070
                                                     user2,
1071
                                                     remove=False)
1072
        self.assertEqual(user2.authpolicy_profiles.count(), 1)
1073

    
1074
        local = auth_providers.get_provider('local', user2)
1075
        google = auth_providers.get_provider('google', user2)
1076
        shibboleth = auth_providers.get_provider('shibboleth', user2)
1077
        self.assertTrue(shibboleth.get_login_policy)
1078
        self.assertFalse(shibboleth.get_remove_policy)
1079
        self.assertFalse(local.get_add_policy)
1080
        self.assertFalse(local.get_add_policy)
1081
        self.assertFalse(google.get_add_policy)
1082

    
1083
        user2.groups.remove(Group.objects.get(name='academic-user'))
1084
        self.assertTrue(local.get_add_policy)
1085
        self.assertTrue(google.get_add_policy)
1086
        user2.groups.add(Group.objects.get(name='academic-user'))
1087

    
1088
        AuthProviderPolicyProfile.objects.add_policy('academic', 'shibboleth',
1089
                                                     user2,
1090
                                                     exclusive=True,
1091
                                                     add=True)
1092
        self.assertTrue(local.get_add_policy)
1093
        self.assertTrue(google.get_add_policy)
1094

    
1095
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_AUTOMODERATE_POLICY = True
1096
        self.assertFalse(local.get_automoderate_policy)
1097
        self.assertFalse(google.get_automoderate_policy)
1098
        self.assertTrue(shibboleth.get_automoderate_policy)
1099

    
1100
        for s in ['SHIBBOLETH_CREATION_GROUPS_POLICY',
1101
                  'GOOGLE_ADD_GROUPS_POLICY']:
1102
            delattr(settings, 'ASTAKOS_AUTH_PROVIDER_%s' % s)
1103

    
1104

    
1105
    @shibboleth_settings(CREATE_POLICY=True)
1106
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1107
    def test_create_http(self):
1108
        # this should be wrapped inside a transaction
1109
        user = AstakosUser(email="test@test.com")
1110
        user.save()
1111
        provider = auth_providers.get_provider('shibboleth', user,
1112
                                               'test@academia.test')
1113
        provider.add_to_user()
1114
        user.get_auth_provider('shibboleth', 'test@academia.test')
1115
        provider = auth_providers.get_provider('local', user)
1116
        provider.add_to_user()
1117
        user.get_auth_provider('local')
1118

    
1119
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = False
1120
        user = AstakosUser(email="test2@test.com")
1121
        user.save()
1122
        provider = auth_providers.get_provider('shibboleth', user,
1123
                                               'test@shibboleth.com',
1124
                                               **{'info': {'name':
1125
                                                                'User Test'}})
1126
        self.assertFalse(provider.get_create_policy)
1127
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_CREATE_POLICY = True
1128
        self.assertTrue(provider.get_create_policy)
1129
        academic = provider.add_to_user()
1130

    
1131
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1132
    @shibboleth_settings(LIMIT_POLICY=2)
1133
    def test_policies(self):
1134
        user = get_local_user('kpap@grnet.gr')
1135
        user.add_auth_provider('shibboleth', identifier='1234')
1136
        user.add_auth_provider('shibboleth', identifier='12345')
1137

    
1138
        # default limit is 1
1139
        local = user.get_auth_provider('local')
1140
        self.assertEqual(local.get_add_policy, False)
1141

    
1142
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 3
1143
        academic = user.get_auth_provider('shibboleth',
1144
                                          identifier='1234')
1145
        self.assertEqual(academic.get_add_policy, False)
1146
        newacademic = auth_providers.get_provider('shibboleth', user,
1147
                                                  identifier='123456')
1148
        self.assertEqual(newacademic.get_add_policy, True)
1149
        user.add_auth_provider('shibboleth', identifier='123456')
1150
        self.assertEqual(academic.get_add_policy, False)
1151
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_LIMIT_POLICY = 1
1152

    
1153
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1154
    @shibboleth_settings(LIMIT_POLICY=2)
1155
    def test_messages(self):
1156
        user = get_local_user('kpap@grnet.gr')
1157
        user.add_auth_provider('shibboleth', identifier='1234')
1158
        user.add_auth_provider('shibboleth', identifier='12345')
1159
        provider = auth_providers.get_provider('shibboleth')
1160
        self.assertEqual(provider.get_message('title'), 'Academic')
1161
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = 'New title'
1162
        # regenerate messages cache
1163
        provider = auth_providers.get_provider('shibboleth')
1164
        self.assertEqual(provider.get_message('title'), 'New title')
1165
        self.assertEqual(provider.get_message('login_title'),
1166
                         'New title LOGIN')
1167
        self.assertEqual(provider.get_login_title_msg, 'New title LOGIN')
1168
        self.assertEqual(provider.get_module_icon,
1169
                         settings.MEDIA_URL + 'im/auth/icons/shibboleth.png')
1170
        self.assertEqual(provider.get_module_medium_icon,
1171
                         settings.MEDIA_URL +
1172
                         'im/auth/icons-medium/shibboleth.png')
1173

    
1174
        settings.ASTAKOS_AUTH_PROVIDER_SHIBBOLETH_TITLE = None
1175
        provider = auth_providers.get_provider('shibboleth', user, '12345')
1176
        self.assertEqual(provider.get_method_details_msg,
1177
                         'Account: 12345')
1178
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1179
        self.assertEqual(provider.get_method_details_msg,
1180
                         'Account: 1234')
1181

    
1182
        provider = auth_providers.get_provider('shibboleth', user, '1234')
1183
        self.assertEqual(provider.get_not_active_msg,
1184
                         "'Academic login' is disabled.")
1185

    
1186
    @im_settings(IM_MODULES=['local', 'shibboleth'])
1187
    @shibboleth_settings(LIMIT_POLICY=2)
1188
    def test_templates(self):
1189
        user = get_local_user('kpap@grnet.gr')
1190
        user.add_auth_provider('shibboleth', identifier='1234')
1191
        user.add_auth_provider('shibboleth', identifier='12345')
1192

    
1193
        provider = auth_providers.get_provider('shibboleth')
1194
        self.assertEqual(provider.get_template('login'),
1195
                         'im/auth/shibboleth_login.html')
1196
        provider = auth_providers.get_provider('google')
1197
        self.assertEqual(provider.get_template('login'),
1198
                         'im/auth/generic_login.html')
1199

    
1200

    
1201
class TestProjects(TestCase):
1202
    """
1203
    Test projects.
1204
    """
1205
    def setUp(self):
1206
        # astakos resources
1207
        self.astakos_service = Service.objects.create(name="astakos",
1208
                                                      api_url="/astakos/api/")
1209
        self.resource = Resource.objects.create(name="astakos.pending_app",
1210
                                                uplimit=0,
1211
                                                allow_in_projects=False,
1212
                                                service=self.astakos_service)
1213

    
1214
        # custom service resources
1215
        self.service = Service.objects.create(name="service1",
1216
                                              api_url="http://service.api")
1217
        self.resource = Resource.objects.create(name="service1.resource",
1218
                                                uplimit=100,
1219
                                                service=self.service)
1220
        self.admin = get_local_user("projects-admin@synnefo.org")
1221
        self.admin.uuid = 'uuid1'
1222
        self.admin.save()
1223

    
1224
        self.user = get_local_user("user@synnefo.org")
1225
        self.member = get_local_user("member@synnefo.org")
1226
        self.member2 = get_local_user("member2@synnefo.org")
1227

    
1228
        self.admin_client = get_user_client("projects-admin@synnefo.org")
1229
        self.user_client = get_user_client("user@synnefo.org")
1230
        self.member_client = get_user_client("member@synnefo.org")
1231
        self.member2_client = get_user_client("member2@synnefo.org")
1232

    
1233
        quotas.qh_sync_users(AstakosUser.objects.all())
1234

    
1235
    @im_settings(PROJECT_ADMINS=['uuid1'])
1236
    def test_application_limit(self):
1237
        # user cannot create a project
1238
        r = self.user_client.get(reverse('project_add'), follow=True)
1239
        self.assertRedirects(r, reverse('project_list'))
1240
        self.assertContains(r, "You are not allowed to create a new project")
1241

    
1242
        # but admin can
1243
        r = self.admin_client.get(reverse('project_add'), follow=True)
1244
        self.assertRedirects(r, reverse('project_add'))
1245

    
1246
    @im_settings(PROJECT_ADMINS=['uuid1'])
1247
    def test_allow_in_project(self):
1248
        dfrom = datetime.now()
1249
        dto = datetime.now() + timedelta(days=30)
1250

    
1251
        # astakos.pending_uplimit allow_in_project flag is False
1252
        # we shouldn't be able to create a project application using this
1253
        # resource.
1254
        application_data = {
1255
            'name': 'project.synnefo.org',
1256
            'homepage': 'https://www.synnefo.org',
1257
            'start_date': dfrom.strftime("%Y-%m-%d"),
1258
            'end_date': dto.strftime("%Y-%m-%d"),
1259
            'member_join_policy': 2,
1260
            'member_leave_policy': 1,
1261
            'service1.resource_uplimit': 100,
1262
            'is_selected_service1.resource': "1",
1263
            'astakos.pending_app_uplimit': 100,
1264
            'is_selected_accounts': "1",
1265
            'user': self.user.pk
1266
        }
1267
        form = forms.ProjectApplicationForm(data=application_data)
1268
        # form is invalid
1269
        self.assertEqual(form.is_valid(), False)
1270

    
1271
        del application_data['astakos.pending_app_uplimit']
1272
        del application_data['is_selected_accounts']
1273
        form = forms.ProjectApplicationForm(data=application_data)
1274
        self.assertEqual(form.is_valid(), True)
1275

    
1276
    @im_settings(PROJECT_ADMINS=['uuid1'])
1277
    def test_applications(self):
1278
        # let user have 2 pending applications
1279
        quotas.add_base_quota(self.user, 'astakos.pending_app', 2)
1280

    
1281
        r = self.user_client.get(reverse('project_add'), follow=True)
1282
        self.assertRedirects(r, reverse('project_add'))
1283

    
1284
        # user fills the project application form
1285
        post_url = reverse('project_add') + '?verify=1'
1286
        dfrom = datetime.now()
1287
        dto = datetime.now() + timedelta(days=30)
1288
        application_data = {
1289
            'name': 'project.synnefo.org',
1290
            'homepage': 'https://www.synnefo.org',
1291
            'start_date': dfrom.strftime("%Y-%m-%d"),
1292
            'end_date': dto.strftime("%Y-%m-%d"),
1293
            'member_join_policy': 2,
1294
            'member_leave_policy': 1,
1295
            'service1.resource_uplimit': 100,
1296
            'is_selected_service1.resource': "1",
1297
            'user': self.user.pk
1298
        }
1299
        r = self.user_client.post(post_url, data=application_data, follow=True)
1300
        self.assertEqual(r.status_code, 200)
1301
        self.assertEqual(r.context['form'].is_valid(), True)
1302

    
1303
        # confirm request
1304
        post_url = reverse('project_add') + '?verify=0&edit=0'
1305
        r = self.user_client.post(post_url, data=application_data, follow=True)
1306
        self.assertContains(r, "The project application has been received")
1307
        self.assertRedirects(r, reverse('project_list'))
1308
        self.assertEqual(ProjectApplication.objects.count(), 1)
1309
        app1_id = ProjectApplication.objects.filter().order_by('pk')[0].pk
1310

    
1311
        # create another one
1312
        application_data['name'] = 'project2.synnefo.org'
1313
        r = self.user_client.post(post_url, data=application_data, follow=True)
1314
        app2_id = ProjectApplication.objects.filter().order_by('pk')[1].pk
1315

    
1316
        # no more applications (LIMIT is 2)
1317
        r = self.user_client.get(reverse('project_add'), follow=True)
1318
        self.assertRedirects(r, reverse('project_list'))
1319
        self.assertContains(r, "You are not allowed to create a new project")
1320

    
1321
        # login
1322
        self.admin_client.get(reverse("edit_profile"))
1323
        # admin approves
1324
        r = self.admin_client.post(reverse('project_app_approve',
1325
                                           kwargs={'application_id': app1_id}),
1326
                                   follow=True)
1327
        self.assertEqual(r.status_code, 200)
1328

    
1329
        # project created
1330
        self.assertEqual(Project.objects.count(), 1)
1331

    
1332
        # login
1333
        self.member_client.get(reverse("edit_profile"))
1334
        # cannot join app2 (not approved yet)
1335
        join_url = reverse("project_join", kwargs={'chain_id': app2_id})
1336
        r = self.member_client.post(join_url, follow=True)
1337
        self.assertEqual(r.status_code, 403)
1338

    
1339
        # can join app1
1340
        self.member_client.get(reverse("edit_profile"))
1341
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1342
        r = self.member_client.post(join_url, follow=True)
1343
        self.assertEqual(r.status_code, 200)
1344

    
1345
        memberships = ProjectMembership.objects.all()
1346
        self.assertEqual(len(memberships), 1)
1347
        memb_id = memberships[0].id
1348

    
1349
        reject_member_url = reverse('project_reject_member',
1350
                                    kwargs={'chain_id': app1_id, 'memb_id':
1351
                                            memb_id})
1352
        accept_member_url = reverse('project_accept_member',
1353
                                    kwargs={'chain_id': app1_id, 'memb_id':
1354
                                            memb_id})
1355

    
1356
        # only project owner is allowed to reject
1357
        r = self.member_client.post(reject_member_url, follow=True)
1358
        self.assertContains(r, "You do not have the permissions")
1359
        self.assertEqual(r.status_code, 200)
1360

    
1361
        # user (owns project) rejects membership
1362
        r = self.user_client.post(reject_member_url, follow=True)
1363
        self.assertEqual(ProjectMembership.objects.count(), 0)
1364

    
1365
        # user rejoins
1366
        self.member_client.get(reverse("edit_profile"))
1367
        join_url = reverse("project_join", kwargs={'chain_id': app1_id})
1368
        r = self.member_client.post(join_url, follow=True)
1369
        self.assertEqual(r.status_code, 200)
1370
        self.assertEqual(ProjectMembership.objects.count(), 1)
1371

    
1372
        # user (owns project) accepts membership
1373
        r = self.user_client.post(accept_member_url, follow=True)
1374
        self.assertEqual(ProjectMembership.objects.count(), 1)
1375
        membership = ProjectMembership.objects.get()
1376
        self.assertEqual(membership.state, ProjectMembership.ACCEPTED)
1377

    
1378
        user_quotas = quotas.get_users_quotas([self.member])
1379
        resource = 'service1.resource'
1380
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1381
        # 100 from initial uplimit + 100 from project
1382
        self.assertEqual(newlimit, 200)
1383

    
1384
        remove_member_url = reverse('project_remove_member',
1385
                                    kwargs={'chain_id': app1_id, 'memb_id':
1386
                                            membership.id})
1387
        r = self.user_client.post(remove_member_url, follow=True)
1388
        self.assertEqual(r.status_code, 200)
1389

    
1390
        user_quotas = quotas.get_users_quotas([self.member])
1391
        resource = 'service1.resource'
1392
        newlimit = user_quotas[self.member.uuid]['system'][resource]['limit']
1393
        # 200 - 100 from project
1394
        self.assertEqual(newlimit, 100)
1395

    
1396

    
1397
ROOT = '/astakos/api/'
1398
u = lambda url: ROOT + url
1399

    
1400

    
1401
class QuotaAPITest(TestCase):
1402
    def test_0(self):
1403
        client = Client()
1404
        # custom service resources
1405
        service1 = Service.objects.create(
1406
            name="service1", api_url="http://service1.api")
1407
        resource11 = {"name": "service1.resource11",
1408
                      "desc": "resource11 desc",
1409
                      "allow_in_projects": True}
1410
        r, _ = resources.add_resource(service1, resource11)
1411
        resources.update_resource(r, 100)
1412
        resource12 = {"name": "service1.resource12",
1413
                      "desc": "resource11 desc",
1414
                      "unit": "bytes"}
1415
        r, _ = resources.add_resource(service1, resource12)
1416
        resources.update_resource(r, 1024)
1417

    
1418
        # create user
1419
        user = get_local_user('test@grnet.gr')
1420
        quotas.qh_sync_user(user)
1421

    
1422
        # create another service
1423
        service2 = Service.objects.create(
1424
            name="service2", api_url="http://service2.api")
1425
        resource21 = {"name": "service2.resource21",
1426
                      "desc": "resource11 desc",
1427
                      "allow_in_projects": False}
1428
        r, _ = resources.add_resource(service2, resource21)
1429
        resources.update_resource(r, 3)
1430

    
1431
        resource_names = [r['name'] for r in
1432
                          [resource11, resource12, resource21]]
1433

    
1434
        # get resources
1435
        r = client.get(u('resources'), follow=True)
1436
        self.assertEqual(r.status_code, 200)
1437
        body = json.loads(r.content)
1438
        for name in resource_names:
1439
            assertIn(name, body)
1440

    
1441
        # get quota
1442
        r = client.get(u('quotas'), follow=True)
1443
        self.assertEqual(r.status_code, 401)
1444

    
1445
        headers = {'HTTP_X_AUTH_TOKEN': user.auth_token}
1446
        r = client.get(u('quotas'), follow=True, **headers)
1447
        self.assertEqual(r.status_code, 200)
1448
        body = json.loads(r.content)
1449
        system_quota = body['system']
1450
        assertIn('system', body)
1451
        for name in resource_names:
1452
            assertIn(name, system_quota)
1453

    
1454
        r = client.get(u('service_quotas'), follow=True)
1455
        self.assertEqual(r.status_code, 401)
1456

    
1457
        s1_headers = {'HTTP_X_AUTH_TOKEN': service1.auth_token}
1458
        r = client.get(u('service_quotas'), follow=True, **s1_headers)
1459
        self.assertEqual(r.status_code, 200)
1460
        body = json.loads(r.content)
1461
        assertIn(user.uuid, body)
1462

    
1463
        r = client.get(u('commissions'), follow=True, **s1_headers)
1464
        self.assertEqual(r.status_code, 200)
1465
        body = json.loads(r.content)
1466
        self.assertEqual(body, [])
1467

    
1468
        # issue some commissions
1469
        commission_request = {
1470
            "force": False,
1471
            "auto_accept": False,
1472
            "name": "my commission",
1473
            "provisions": [
1474
                {
1475
                    "holder": user.uuid,
1476
                    "source": "system",
1477
                    "resource": resource11['name'],
1478
                    "quantity": 1
1479
                },
1480
                {
1481
                    "holder": user.uuid,
1482
                    "source": "system",
1483
                    "resource": resource12['name'],
1484
                    "quantity": 30000
1485
                }]}
1486

    
1487
        post_data = json.dumps(commission_request)
1488
        r = client.post(u('commissions'), post_data,
1489
                        content_type='application/json', **s1_headers)
1490
        self.assertEqual(r.status_code, 413)
1491

    
1492
        commission_request = {
1493
            "force": False,
1494
            "auto_accept": False,
1495
            "name": "my commission",
1496
            "provisions": [
1497
                {
1498
                    "holder": user.uuid,
1499
                    "source": "system",
1500
                    "resource": resource11['name'],
1501
                    "quantity": 1
1502
                },
1503
                {
1504
                    "holder": user.uuid,
1505
                    "source": "system",
1506
                    "resource": resource12['name'],
1507
                    "quantity": 100
1508
                }]}
1509

    
1510
        post_data = json.dumps(commission_request)
1511
        r = client.post(u('commissions'), post_data,
1512
                        content_type='application/json', **s1_headers)
1513
        self.assertEqual(r.status_code, 201)
1514
        body = json.loads(r.content)
1515
        serial = body['serial']
1516
        self.assertEqual(serial, 1)
1517

    
1518
        post_data = json.dumps(commission_request)
1519
        r = client.post(u('commissions'), post_data,
1520
                        content_type='application/json', **s1_headers)
1521
        self.assertEqual(r.status_code, 201)
1522
        body = json.loads(r.content)
1523
        self.assertEqual(body['serial'], 2)
1524

    
1525
        post_data = json.dumps(commission_request)
1526
        r = client.post(u('commissions'), post_data,
1527
                        content_type='application/json', **s1_headers)
1528
        self.assertEqual(r.status_code, 201)
1529
        body = json.loads(r.content)
1530
        self.assertEqual(body['serial'], 3)
1531

    
1532
        r = client.get(u('commissions'), follow=True, **s1_headers)
1533
        self.assertEqual(r.status_code, 200)
1534
        body = json.loads(r.content)
1535
        self.assertEqual(body, [1, 2, 3])
1536

    
1537
        r = client.get(u('commissions/' + str(serial)), follow=True,
1538
                       **s1_headers)
1539
        self.assertEqual(r.status_code, 200)
1540
        body = json.loads(r.content)
1541
        self.assertEqual(body['serial'], serial)
1542
        assertIn('issue_time', body)
1543
        self.assertEqual(body['provisions'], commission_request['provisions'])
1544
        self.assertEqual(body['name'], commission_request['name'])
1545

    
1546
        r = client.get(u('service_quotas?user=' + user.uuid),
1547
                       follow=True, **s1_headers)
1548
        self.assertEqual(r.status_code, 200)
1549
        body = json.loads(r.content)
1550
        user_quota = body[user.uuid]
1551
        system_quota = user_quota['system']
1552
        r11 = system_quota[resource11['name']]
1553
        self.assertEqual(r11['usage'], 3)
1554
        self.assertEqual(r11['pending'], 3)
1555

    
1556
        # resolve pending commissions
1557
        resolve_data = {
1558
            "accept": [1, 3],
1559
            "reject": [2, 3, 4],
1560
        }
1561
        post_data = json.dumps(resolve_data)
1562

    
1563
        r = client.post(u('commissions/action'), post_data,
1564
                        content_type='application/json', **s1_headers)
1565
        self.assertEqual(r.status_code, 200)
1566
        body = json.loads(r.content)
1567
        self.assertEqual(body['accepted'], [1])
1568
        self.assertEqual(body['rejected'], [2])
1569
        failed = body['failed']
1570
        self.assertEqual(len(failed), 2)
1571

    
1572
        r = client.get(u('commissions/' + str(serial)), follow=True,
1573
                       **s1_headers)
1574
        self.assertEqual(r.status_code, 404)
1575

    
1576
        # auto accept
1577
        commission_request = {
1578
            "auto_accept": True,
1579
            "name": "my commission",
1580
            "provisions": [
1581
                {
1582
                    "holder": user.uuid,
1583
                    "source": "system",
1584
                    "resource": resource11['name'],
1585
                    "quantity": 1
1586
                },
1587
                {
1588
                    "holder": user.uuid,
1589
                    "source": "system",
1590
                    "resource": resource12['name'],
1591
                    "quantity": 100
1592
                }]}
1593

    
1594
        post_data = json.dumps(commission_request)
1595
        r = client.post(u('commissions'), post_data,
1596
                        content_type='application/json', **s1_headers)
1597
        self.assertEqual(r.status_code, 201)
1598
        body = json.loads(r.content)
1599
        serial = body['serial']
1600
        self.assertEqual(serial, 4)
1601

    
1602
        r = client.get(u('commissions/' + str(serial)), follow=True,
1603
                       **s1_headers)
1604
        self.assertEqual(r.status_code, 404)
1605

    
1606
        # malformed
1607
        commission_request = {
1608
            "auto_accept": True,
1609
            "name": "my commission",
1610
            "provisions": [
1611
                {
1612
                    "holder": user.uuid,
1613
                    "source": "system",
1614
                    "resource": resource11['name'],
1615
                }
1616
            ]}
1617

    
1618
        post_data = json.dumps(commission_request)
1619
        r = client.post(u('commissions'), post_data,
1620
                        content_type='application/json', **s1_headers)
1621
        self.assertEqual(r.status_code, 400)
1622

    
1623
        commission_request = {
1624
            "auto_accept": True,
1625
            "name": "my commission",
1626
            "provisions": "dummy"}
1627

    
1628
        post_data = json.dumps(commission_request)
1629
        r = client.post(u('commissions'), post_data,
1630
                        content_type='application/json', **s1_headers)
1631
        self.assertEqual(r.status_code, 400)
1632

    
1633
        r = client.post(u('commissions'), commission_request,
1634
                        content_type='application/json', **s1_headers)
1635
        self.assertEqual(r.status_code, 400)
1636

    
1637
        # no holding
1638
        commission_request = {
1639
            "auto_accept": True,
1640
            "name": "my commission",
1641
            "provisions": [
1642
                {
1643
                    "holder": user.uuid,
1644
                    "source": "system",
1645
                    "resource": "non existent",
1646
                    "quantity": 1
1647
                },
1648
                {
1649
                    "holder": user.uuid,
1650
                    "source": "system",
1651
                    "resource": resource12['name'],
1652
                    "quantity": 100
1653
                }]}
1654

    
1655
        post_data = json.dumps(commission_request)
1656
        r = client.post(u('commissions'), post_data,
1657
                        content_type='application/json', **s1_headers)
1658
        self.assertEqual(r.status_code, 404)
1659

    
1660
        # release
1661
        commission_request = {
1662
            "provisions": [
1663
                {
1664
                    "holder": user.uuid,
1665
                    "source": "system",
1666
                    "resource": resource11['name'],
1667
                    "quantity": -1
1668
                }
1669
            ]}
1670

    
1671
        post_data = json.dumps(commission_request)
1672
        r = client.post(u('commissions'), post_data,
1673
                        content_type='application/json', **s1_headers)
1674
        self.assertEqual(r.status_code, 201)
1675
        body = json.loads(r.content)
1676
        serial = body['serial']
1677

    
1678
        accept_data = {'accept': ""}
1679
        post_data = json.dumps(accept_data)
1680
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1681
                        content_type='application/json', **s1_headers)
1682
        self.assertEqual(r.status_code, 200)
1683

    
1684
        reject_data = {'reject': ""}
1685
        post_data = json.dumps(accept_data)
1686
        r = client.post(u('commissions/' + str(serial) + '/action'), post_data,
1687
                        content_type='application/json', **s1_headers)
1688
        self.assertEqual(r.status_code, 404)
1689

    
1690
        # force
1691
        commission_request = {
1692
            "force": True,
1693
            "provisions": [
1694
                {
1695
                    "holder": user.uuid,
1696
                    "source": "system",
1697
                    "resource": resource11['name'],
1698
                    "quantity": 100
1699
                }]}
1700

    
1701
        post_data = json.dumps(commission_request)
1702
        r = client.post(u('commissions'), post_data,
1703
                        content_type='application/json', **s1_headers)
1704
        self.assertEqual(r.status_code, 201)
1705
        body = json.loads(r.content)
1706

    
1707
        r = client.get(u('quotas'), **headers)
1708
        self.assertEqual(r.status_code, 200)
1709
        body = json.loads(r.content)
1710
        system_quota = body['system']
1711
        r11 = system_quota[resource11['name']]
1712
        self.assertEqual(r11['usage'], 102)
1713
        self.assertEqual(r11['pending'], 101)