Revision faa26af8

b/api/fixtures/api_test_data.json
250 250
            "created": "2011-02-06 00:00:00",
251 251
            "updated": "2011-02-06 00:00:00",
252 252
            "state": "ACTIVE",
253
            "description": "Full Debian Squeeze Installation",
254 253
            "size": 5678
255 254
        }
256 255
    },
......
330 329
            "created": "2011-02-10 00:00:00",
331 330
            "updated": "2011-02-10 00:00:00",
332 331
            "state": "ACTIVE",
333
            "description": "Full Slackware 13.1 Installation",
334 332
            "size": 1234,
335 333
            "owner" : 1,
336 334
            "sourcevm": 1001
b/api/fixtures/auth_test_data.json
3 3
        "model": "db.SynnefoUser",
4 4
        "pk": 1,
5 5
        "fields": {
6
            "name": "Test User",
7
            "credit": 1024,
8
            "created": "2011-02-06 00:00:00",
9
            "updated": "2011-02-06 00:00:00"
10
        }
6
            "name": "testdbuser",
7
            "realname" :"test db user",
8
            "uniq" :"test@synnefo.gr",
9
            "credit": 10,
10
            "auth_token": "46e427d657b20defe352804f0eb6f8a2",
11
            "auth_token_created": "2009-04-07 09:17:14",
12
            "type": "STUDENT",
13
            "created": "2011-02-06"
14
   	    }
11 15
    }
12 16
]
b/api/middleware.py
3 3
from django.http import HttpResponse, HttpResponseRedirect
4 4
from synnefo.db.models import SynnefoUser
5 5
from synnefo.logic.shibboleth import Tokens, register_shibboleth_user
6
import time
6 7

  
7 8
class SynnefoAuthMiddleware(object):
8 9

  
......
13 14
    def process_request(self, request):
14 15

  
15 16
        if self.auth_token in request.META:
17
            user = None
16 18
            #Retrieve user from DB or other caching mechanism
17
            user = SynnefoUser.objects.filter(auth_token = request.META[self.auth_token])
18
            if user is None :
19
            try:
20
                user = SynnefoUser.objects.get(auth_token = request.META[self.auth_token])
21
            except SynnefoUser.DoesNotExist:
22
                return HttpResponseRedirect(settings.SHIBBOLETH_HOST)
23

  
24
            #Check user's auth token
25
            if (time.time() -
26
                time.mktime(user.auth_token_created.timetuple()) +
27
                settings.AUTH_TOKEN_DURATION * 3600) > 0:
28
                #The user's token has expired, re-login
19 29
                return HttpResponseRedirect(settings.SHIBBOLETH_HOST)
30

  
20 31
            request.user = user
21 32
            return
22 33

  
23
        #A user authenticated by Shibboleth
34
        #A user authenticated by Shibboleth, must include a uniq id
24 35
        if Tokens.SIB_EDU_PERSON_PRINCIPAL_NAME in request.META:
25 36
            #TODO: We must somehow make sure that we only process
26 37
            #      SIB headers when coming from a URL whitelist,
27
            #      or a similar for of restriction
38
            #      or a similar form of restriction
28 39
            if request.get_host() not in settings.SHIBBOLETH_WHITELIST.keys():
29 40
                return HttpResponseRedirect(settings.SHIBBOLETH_HOST)
30 41

  
......
44 55
                #Registration failed, redirect to Shibboleth
45 56
                return HttpResponseRedirect(settings.SHIBBOLETH_HOST)
46 57

  
47
            #At this point, the user has been identified in our database
48
            #Check user's auth token
49
            if time() - user.auth_token_created > settings.AUTH_TOKEN_DURATION * 3600:
50
                #The user's token has expired, re-login
51
                return HttpResponseRedirect(settings.SHIBBOLETH_HOST)
52

  
53 58
            #User and authentication token valid, user allowed to proceed
54 59
            return
55 60
            
......
73 78
        #caching of results
74 79
        response['Vary'] = self.auth_key
75 80
        return response
76

  
77
#class HttpResponseAuthenticationRequired(HttpResponse):
78
#    status_code = 401
b/api/tests_auth.py
8 8

  
9 9
from django.test import TestCase
10 10
from django.test.client import Client
11
from django.conf import settings
11 12

  
12 13
from synnefo.logic.shibboleth import Tokens, NoUniqueToken
13 14
from synnefo.db.models import SynnefoUser
14 15

  
16
from datetime import datetime, timedelta
17

  
15 18
class AuthTestCase(TestCase):
16
    fixtures = ['api_test_data']
19
    fixtures = ['api_test_data', 'auth_test_data']
17 20
    apibase = '/api/v1.1'
18 21

  
19 22
    def setUp(self):
......
32 35
        except SynnefoUser.DoesNotExist:
33 36
            self.assertNotEqual(user, None)
34 37
        self.assertNotEqual(user, None)
38
        self.assertTrue('X-Auth-Token' in response.META)
39
        self.assertTrue(len(response['X-Auth-Token']))
35 40

  
36 41
    def test_shibboleth_no_uniq_request(self):
37 42
        """test a request with no unique field
38 43
        """
39
        try :
40
            response = self.client.get(self.apibase + '/servers', {},
41
                                   **{Tokens.SIB_GIVEN_NAME: 'Jimmy',
42
                                      Tokens.SIB_DISPLAY_NAME: 'Jimmy Hendrix'})
43
            self.assertEqual(True, True)
44
        except NoUniqueToken:
45
            self.assertEqual(True, True)
44
        response = self.client.get(self.apibase + '/servers', {},
45
                                    **{Tokens.SIB_GIVEN_NAME: 'Jimmy',
46
                                    Tokens.SIB_DISPLAY_NAME: 'Jimmy Hendrix'})
47
        self._test_redirect(response)
46 48

  
47 49
    def test_shibboleth_wrong_from_request(self):
48 50
        """ test request from wrong host
49 51
        """
50
        #TODO: Test request from wrong host
51
        #self.client
52
        #response = self.client.get(self.apibase + '/servers', {},
53
        #                           **{Tokens.SIB_GIVEN_NAME: 'Jimmy',
54
        #                              Tokens.SIB_EDU_PERSON_PRINCIPAL_NAME: 'jh@gmail.com',
55
        #                              Tokens.SIB_DISPLAY_NAME: 'Jimmy Hendrix'})
52
        response = self.client.get(self.apibase + '/servers', {},
53
                                   **{Tokens.SIB_GIVEN_NAME: 'Jimmy',
54
                                      Tokens.SIB_EDU_PERSON_PRINCIPAL_NAME: 'jh@gmail.com',
55
                                      Tokens.SIB_DISPLAY_NAME: 'Jimmy Hendrix',
56
                                      'REMOTE_ADDR': '1.2.3.4',
57
                                      'SERVER_NAME': 'nohost.nodomain'})
58
        self._test_redirect(response)
56 59

  
57 60
    def test_shibboleth_expired_token(self):
58 61
        """ test request from expired token
59 62
        """
63
        user = SynnefoUser.objects.get(uniq = "test@synnefo.gr")
64
        self.assertNotEqual(user.auth_token_created, None)
65
        user.auth_token_created = (datetime.now() -
66
                                   timedelta(hours = settings.AUTH_TOKEN_DURATION))
67
        user.save()
68
        response = self.client.get(self.apibase + '/servers', {},
69
                                   **{'X-Auth-Token': user.auth_token})
70
        self._test_redirect(response)
60 71

  
61
        #response = self.client.get(self.apibase + '/servers', {},
62
        #                           **{Tokens.SIB_GIVEN_NAME: 'Jimmy',
63
        #                              Tokens.SIB_EDU_PERSON_PRINCIPAL_NAME: 'jh@gmail.com',
64
        #                              Tokens.SIB_DISPLAY_NAME: 'Jimmy Hendrix'})
65

  
66
    def test_auth_shibboleth(self):
72
    def test_shibboleth_auth(self):
67 73
        """ test redirect to shibboleth page
68 74
        """
69 75
        response = self.client.get(self.apibase + '/servers')
70
        self.assertEquals(response.status_code, 302)
76
        user = SynnefoUser.objects.get(uniq = "test@synnefo.gr")
77
        self.assertTrue('X-Auth-Token' in response.META)
71 78

  
72 79
    def test_fail_oapi_auth(self):
73 80
        """ test authentication from not registered user using OpenAPI
......
94 101
        response = self.client.get(self.apibase + '/servers/detail', {},
95 102
                                   **{'X-Auth-Token': token})
96 103
        self.assertEquals(response.status_code, 200)
104

  
105
    def _test_redirect(self, response):
106
        self.assertEquals(response.status_code, 302)
107
        self.assertEquals('Location' in response.META)
108
        self.assertEquals(response['Location'], settings.SHIBBOLETH_HOST)
b/db/fixtures/db_test_data.json
8 8
            "uniq" :"test@synnefo.gr",
9 9
            "credit": 10,
10 10
            "auth_token": "46e427d657b20defe352804f0eb6f8a2",
11
            "auth_token_created": "2009-04-07 09:17:14",
11 12
            "type": "STUDENT",
12 13
            "created": "2011-02-06"
13 14
   	    }
b/db/models.py
18 18
    uniq = models.CharField('External Unique ID', max_length=255)
19 19
    credit = models.IntegerField('Credit Balance')
20 20
    auth_token = models.CharField('Authentication Token', max_length=32, null=True)
21
    auth_token_created = models.DateTimeField('Time of auth token creation', null=True)
21
    auth_token_created = models.DateTimeField('Time of auth token creation', auto_now_add=True)
22 22
    type = models.CharField('Current Image State', choices=ACCOUNT_TYPE, max_length=30)
23 23
    created = models.DateTimeField('Time of creation', auto_now_add=True)
24 24
    updated = models.DateTimeField('Time of last update', auto_now=True)
b/logic/shibboleth.py
18 18
    SIB_GR_EDU_PERSON_UNDERGRADUATE_BRANCH = "grEduPersonUndergraduateBranch"
19 19

  
20 20
class NoUniqueToken(object):
21

  
22
    def __init__(self, msg):
23
        self.msg = msg
24
    
25
    pass
26

  
27
class NoRealName(object):
28

  
29
    def __init__(self, msg):
30
        self.msg = msg
31

  
21 32
    pass
22 33

  
23 34
def register_shibboleth_user(tokens):
......
39 50
    unq = tokens.get(Tokens.SIB_EDU_PERSON_PRINCIPAL_NAME)
40 51

  
41 52
    if unq is None:
42
        raise NoUniqueToken
53
        raise NoUniqueToken("Authentication does not return a unique token")
54

  
55
    if realname is None:
56
        raise NoRealName("Authentication does not return the user's name")
43 57

  
44 58
    if is_student:
45 59
        users.register_student(realname, '' ,unq)

Also available in: Unified diff