Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ aaaf78f2

History | View | Annotate | Download (8.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django import http
36
from django.test import TransactionTestCase
37
from django.conf import settings
38
from django.test.client import Client
39
from django.core.urlresolvers import clear_url_caches
40
from django.utils import simplejson as json
41
from django.conf import settings
42
from django.core.urlresolvers import reverse
43

    
44
from synnefo.userdata.models import *
45

    
46

    
47
class AaiClient(Client):
48

    
49
    def request(self, **request):
50
        request['HTTP_X_AUTH_TOKEN'] = '0000'
51
        return super(AaiClient, self).request(**request)
52

    
53
class TestRestViews(TransactionTestCase):
54

    
55
    fixtures = ['users']
56

    
57
    def setUp(self):
58
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10
59

    
60
        def get_user_mock(request, *Args, **kwargs):
61
            if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
62
                request.user_uniq = 'test'
63
                request.user = {'id': 'id',
64
                                'username': 'username',
65
                                'uuid': 'test'}
66

    
67
        # mock the astakos authentication function
68
        from snf_django.lib import astakos
69
        astakos.get_user = get_user_mock
70

    
71
        settings.SKIP_SSH_VALIDATION = True
72
        self.client = AaiClient()
73
        self.user = 'test'
74
        self.keys_url = reverse('ui_keys_collection')
75

    
76
    def test_keys_collection_get(self):
77
        resp = self.client.get(self.keys_url)
78
        self.assertEqual(resp.content, "[]")
79

    
80
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
81
                content="content1")
82

    
83
        resp = self.client.get(self.keys_url)
84
        resp_list = json.loads(resp.content);
85
        exp_list = [{"content": "content1", "id": 1,
86
                    "uri": self.keys_url + "/1", "name": "key pair 1",
87
                    "fingerprint": "unknown fingerprint"}]
88
        self.assertEqual(resp_list, exp_list)
89

    
90
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
91
                content="content2")
92

    
93
        resp = self.client.get(self.keys_url)
94
        resp_list = json.loads(resp.content)
95
        exp_list = [{"content": "content1", "id": 1,
96
                     "uri": self.keys_url + "/1", "name": "key pair 1",
97
                     "fingerprint": "unknown fingerprint"},
98
                    {"content": "content2", "id": 2,
99
                     "uri": self.keys_url + "/2",
100
                     "name": "key pair 2",
101
                     "fingerprint": "unknown fingerprint"}]
102

    
103
        self.assertEqual(resp_list, exp_list)
104

    
105
    def test_keys_resourse_get(self):
106
        resp = self.client.get(self.keys_url + "/1")
107
        self.assertEqual(resp.status_code, 404)
108

    
109
        # create a public key
110
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
111
                content="content1")
112
        resp = self.client.get(self.keys_url + "/1")
113
        resp_dict = json.loads(resp.content);
114
        exp_dict = {"content": "content1", "id": 1,
115
                    "uri": self.keys_url + "/1", "name": "key pair 1",
116
                    "fingerprint": "unknown fingerprint"}
117
        self.assertEqual(resp_dict, exp_dict)
118

    
119
        # update
120
        resp = self.client.put(self.keys_url + "/1",
121
                               json.dumps({'name':'key pair 1 new name'}),
122
                               content_type='application/json')
123

    
124
        pk = PublicKeyPair.objects.get(pk=1)
125
        self.assertEqual(pk.name, "key pair 1 new name")
126

    
127
        # delete
128
        resp = self.client.delete(self.keys_url + "/1")
129
        self.assertEqual(PublicKeyPair.objects.count(), 0)
130

    
131
        resp = self.client.get(self.keys_url + "/1")
132
        self.assertEqual(resp.status_code, 404)
133

    
134
        resp = self.client.get(self.keys_url)
135
        self.assertEqual(resp.content, "[]")
136

    
137
        # test rest create
138
        resp = self.client.post(self.keys_url,
139
                                json.dumps({'name':'key pair 2',
140
                                            'content':"""key 2 content"""}),
141
                                content_type='application/json')
142
        self.assertEqual(PublicKeyPair.objects.count(), 1)
143
        pk = PublicKeyPair.objects.get()
144
        self.assertEqual(pk.name, "key pair 2")
145
        self.assertEqual(pk.content, "key 2 content")
146

    
147
    def test_generate_views(self):
148
        import base64
149

    
150
        # just test that
151
        resp = self.client.post(self.keys_url + "/generate")
152
        self.assertNotEqual(resp, "")
153

    
154
        data = json.loads(resp.content)
155
        self.assertEqual(data.has_key('private'), True)
156
        self.assertEqual(data.has_key('private'), True)
157

    
158
        # public key is base64 encoded
159
        base64.b64decode(data['public'].replace("ssh-rsa ",""))
160

    
161
        # remove header/footer
162
        private = "".join(data['private'].split("\n")[1:-1])
163

    
164
        # private key is base64 encoded
165
        base64.b64decode(private)
166

    
167
        new_key = PublicKeyPair()
168
        new_key.content = data['public']
169
        new_key.name = "new key"
170
        new_key.user = 'test'
171
        new_key.full_clean()
172
        new_key.save()
173

    
174
    def test_generate_limit(self):
175
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
176
        resp = self.client.post(self.keys_url,
177
                                json.dumps({'name':'key1',
178
                                            'content':"""key 1 content"""}),
179
                                content_type='application/json')
180
        genpath = self.keys_url + "/generate"
181
        r = self.client.post(genpath)
182
        assert isinstance(r, http.HttpResponseServerError)
183

    
184
    def test_invalid_data(self):
185
        resp = self.client.post(self.keys_url,
186
                                json.dumps({'content':"""key 2 content"""}),
187
                                content_type='application/json')
188

    
189
        self.assertEqual(resp.status_code, 500)
190
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
191
                                       """{"name": ["This field cannot be blank."]}}""")
192

    
193
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
194

    
195
        # test ssh limit
196
        resp = self.client.post(self.keys_url,
197
                                json.dumps({'name':'key1',
198
                                            'content':"""key 1 content"""}),
199
                                content_type='application/json')
200
        resp = self.client.post(self.keys_url,
201
                                json.dumps({'name':'key1',
202
                                            'content':"""key 1 content"""}),
203
                                content_type='application/json')
204
        resp = self.client.post(self.keys_url,
205
                                json.dumps({'name':'key1',
206
                                            'content':"""key 1 content"""}),
207
                                content_type='application/json')
208
        self.assertEqual(resp.status_code, 500)
209
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
210
                                       """{"__all__": ["SSH keys limit exceeded."]}}""")
211