Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ 19b2c29d

History | View | Annotate | Download (9.8 kB)

1
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django import http
36
from django.test import TransactionTestCase
37
from django.conf import settings
38
from django.test.client import Client
39
from django.utils import simplejson as json
40
from django.core.urlresolvers import reverse
41
from django.core.validators import MaxLengthValidator
42

    
43
from snf_django.utils.testing import with_settings
44

    
45
from mock import patch
46

    
47
from synnefo.userdata.models import PublicKeyPair, SSH_KEY_MAX_CONTENT_LENGTH
48

    
49

    
50
def get_user_mock(request, *args, **kwargs):
51
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
52
        request.user_uniq = 'test'
53
        request.user = {"access": {
54
                        "token": {
55
                            "expires": "2013-06-19T15:23:59.975572+00:00",
56
                            "id": "token",
57
                            "tenant": {
58
                                "id": "test",
59
                                "name": "Firstname Lastname"
60
                                }
61
                            },
62
                        "serviceCatalog": [],
63
                        "user": {
64
                            "roles_links": [],
65
                            "id": "test",
66
                            "roles": [{"id": 1, "name": "default"}],
67
                            "name": "Firstname Lastname"}}
68
                        }
69

    
70

    
71
class AaiClient(Client):
72

    
73
    def request(self, **request):
74
        # mock the astakos authentication function
75
        with patch("synnefo.userdata.rest.get_user",
76
                   new=get_user_mock):
77
            with patch("synnefo.userdata.views.get_user",
78
                       new=get_user_mock):
79
                request['HTTP_X_AUTH_TOKEN'] = '0000'
80
                return super(AaiClient, self).request(**request)
81

    
82

    
83
class TestRestViews(TransactionTestCase):
84

    
85
    fixtures = ['users']
86

    
87
    def setUp(self):
88
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10
89

    
90
        settings.SKIP_SSH_VALIDATION = True
91
        self.client = AaiClient()
92
        self.user = 'test'
93
        self.keys_url = reverse('ui_keys_collection')
94

    
95
    def test_key_content_length_limit(self):
96
        # patch validator
97
        PublicKeyPair._meta.fields[3].validators = [MaxLengthValidator(10)]
98
        resp = self.client.post(self.keys_url,
99
                                json.dumps({'name': 'key pair 2',
100
                                            'content': """0123456789+"""}),
101
                                content_type='application/json')
102
        self.assertEqual(resp.status_code, 422)
103
        resp = json.loads(resp.content)
104
        assert 'errors' in resp
105
        assert 'content' in resp['errors']
106
        self.assertEqual(PublicKeyPair.objects.count(), 0)
107
        PublicKeyPair._meta.fields[3].validators = \
108
            [MaxLengthValidator(SSH_KEY_MAX_CONTENT_LENGTH)]
109

    
110
    def test_keys_collection_get(self):
111
        resp = self.client.get(self.keys_url)
112
        self.assertEqual(resp.content, "[]")
113

    
114
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
115
                                     content="content1")
116

    
117
        resp = self.client.get(self.keys_url)
118
        resp_list = json.loads(resp.content)
119
        exp_list = [{"content": "content1", "id": 1,
120
                     "uri": self.keys_url + "/1", "name": "key pair 1",
121
                     "fingerprint": "unknown fingerprint"}]
122
        self.assertEqual(resp_list, exp_list)
123

    
124
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
125
                                     content="content2")
126

    
127
        resp = self.client.get(self.keys_url)
128
        resp_list = json.loads(resp.content)
129
        exp_list = [{"content": "content1", "id": 1,
130
                     "uri": self.keys_url + "/1", "name": "key pair 1",
131
                     "fingerprint": "unknown fingerprint"},
132
                    {"content": "content2", "id": 2,
133
                     "uri": self.keys_url + "/2",
134
                     "name": "key pair 2",
135
                     "fingerprint": "unknown fingerprint"}]
136

    
137
        self.assertEqual(resp_list, exp_list)
138

    
139
    def test_keys_resourse_get(self):
140
        resp = self.client.get(self.keys_url + "/1")
141
        self.assertEqual(resp.status_code, 404)
142

    
143
        # create a public key
144
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
145
                                     content="content1")
146
        resp = self.client.get(self.keys_url + "/1")
147
        resp_dict = json.loads(resp.content)
148
        exp_dict = {"content": "content1", "id": 1,
149
                    "uri": self.keys_url + "/1", "name": "key pair 1",
150
                    "fingerprint": "unknown fingerprint"}
151
        self.assertEqual(resp_dict, exp_dict)
152

    
153
        # update
154
        resp = self.client.put(self.keys_url + "/1",
155
                               json.dumps({'name': 'key pair 1 new name'}),
156
                               content_type='application/json')
157

    
158
        pk = PublicKeyPair.objects.get(pk=1)
159
        self.assertEqual(pk.name, "key pair 1 new name")
160

    
161
        # delete
162
        resp = self.client.delete(self.keys_url + "/1")
163
        self.assertEqual(PublicKeyPair.objects.count(), 0)
164

    
165
        resp = self.client.get(self.keys_url + "/1")
166
        self.assertEqual(resp.status_code, 404)
167

    
168
        resp = self.client.get(self.keys_url)
169
        self.assertEqual(resp.content, "[]")
170

    
171
        # test rest create
172
        resp = self.client.post(self.keys_url,
173
                                json.dumps({'name': 'key pair 2',
174
                                            'content': """key 2 content"""}),
175
                                content_type='application/json')
176
        self.assertEqual(PublicKeyPair.objects.count(), 1)
177
        pk = PublicKeyPair.objects.get()
178
        self.assertEqual(pk.name, "key pair 2")
179
        self.assertEqual(pk.content, "key 2 content")
180

    
181
    def test_generate_views(self):
182
        import base64
183

    
184
        # just test that
185
        resp = self.client.post(self.keys_url + "/generate")
186
        self.assertNotEqual(resp, "")
187

    
188
        data = json.loads(resp.content)
189
        self.assertEqual('private' in data, True)
190
        self.assertEqual('private' in data, True)
191

    
192
        # public key is base64 encoded
193
        base64.b64decode(data['public'].replace("ssh-rsa ", ""))
194

    
195
        # remove header/footer
196
        private = "".join(data['private'].split("\n")[1:-1])
197

    
198
        # private key is base64 encoded
199
        base64.b64decode(private)
200

    
201
        new_key = PublicKeyPair()
202
        new_key.content = data['public']
203
        new_key.name = "new key"
204
        new_key.user = 'test'
205
        new_key.full_clean()
206
        new_key.save()
207

    
208
    def test_generate_limit(self):
209
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
210
        self.client.post(self.keys_url,
211
                         json.dumps({'name': 'key1',
212
                                     'content': """key 1 content"""}),
213
                         content_type='application/json')
214
        genpath = self.keys_url + "/generate"
215
        r = self.client.post(genpath)
216
        assert isinstance(r, http.HttpResponseServerError)
217

    
218
    def test_invalid_data(self):
219
        resp = self.client.post(self.keys_url,
220
                                json.dumps({'content': """key 2 content"""}),
221
                                content_type='application/json')
222

    
223
        self.assertEqual(resp.status_code, 422)
224
        self.assertEqual(resp.content,
225
                         """{"non_field_key": "__all__", "errors": """
226
                         """{"name": ["This field cannot be blank."]}}""")
227

    
228
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
229

    
230
        # test ssh limit
231
        resp = self.client.post(self.keys_url,
232
                                json.dumps({'name': 'key1',
233
                                            'content': """key 1 content"""}),
234
                                content_type='application/json')
235
        resp = self.client.post(self.keys_url,
236
                                json.dumps({'name': 'key1',
237
                                            'content': """key 1 content"""}),
238
                                content_type='application/json')
239
        resp = self.client.post(self.keys_url,
240
                                json.dumps({'name': 'key1',
241
                                            'content': """key 1 content"""}),
242
                                content_type='application/json')
243
        self.assertEqual(resp.status_code, 422)
244
        self.assertEqual(resp.content,
245
                         """{"non_field_key": "__all__", "errors": """
246
                         """{"__all__": ["SSH keys limit exceeded."]}}""")