Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ d3e9d04b

History | View | Annotate | Download (8.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django import http
36
from django.test import TransactionTestCase
37
from django.conf import settings
38
from django.test.client import Client
39
from django.utils import simplejson as json
40
from django.core.urlresolvers import reverse
41
from mock import patch
42

    
43
from synnefo.userdata.models import PublicKeyPair
44

    
45

    
46
def get_user_mock(request, *args, **kwargs):
47
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
48
        request.user_uniq = 'test'
49
        request.user = {'id': 'id',
50
                        'username': 'username',
51
                        'uuid': 'test'}
52

    
53

    
54
class AaiClient(Client):
55

    
56
    def request(self, **request):
57
        # mock the astakos authentication function
58
        with patch("synnefo.userdata.rest.get_user",
59
                   new=get_user_mock):
60
            with patch("synnefo.userdata.views.get_user",
61
                       new=get_user_mock):
62
                request['HTTP_X_AUTH_TOKEN'] = '0000'
63
                return super(AaiClient, self).request(**request)
64

    
65

    
66
class TestRestViews(TransactionTestCase):
67

    
68
    fixtures = ['users']
69

    
70
    def setUp(self):
71
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10
72

    
73
        settings.SKIP_SSH_VALIDATION = True
74
        self.client = AaiClient()
75
        self.user = 'test'
76
        self.keys_url = reverse('ui_keys_collection')
77

    
78
    def test_keys_collection_get(self):
79
        resp = self.client.get(self.keys_url)
80
        self.assertEqual(resp.content, "[]")
81

    
82
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
83
                                     content="content1")
84

    
85
        resp = self.client.get(self.keys_url)
86
        resp_list = json.loads(resp.content)
87
        exp_list = [{"content": "content1", "id": 1,
88
                     "uri": self.keys_url + "/1", "name": "key pair 1",
89
                     "fingerprint": "unknown fingerprint"}]
90
        self.assertEqual(resp_list, exp_list)
91

    
92
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
93
                                     content="content2")
94

    
95
        resp = self.client.get(self.keys_url)
96
        resp_list = json.loads(resp.content)
97
        exp_list = [{"content": "content1", "id": 1,
98
                     "uri": self.keys_url + "/1", "name": "key pair 1",
99
                     "fingerprint": "unknown fingerprint"},
100
                    {"content": "content2", "id": 2,
101
                     "uri": self.keys_url + "/2",
102
                     "name": "key pair 2",
103
                     "fingerprint": "unknown fingerprint"}]
104

    
105
        self.assertEqual(resp_list, exp_list)
106

    
107
    def test_keys_resourse_get(self):
108
        resp = self.client.get(self.keys_url + "/1")
109
        self.assertEqual(resp.status_code, 404)
110

    
111
        # create a public key
112
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
113
                                     content="content1")
114
        resp = self.client.get(self.keys_url + "/1")
115
        resp_dict = json.loads(resp.content)
116
        exp_dict = {"content": "content1", "id": 1,
117
                    "uri": self.keys_url + "/1", "name": "key pair 1",
118
                    "fingerprint": "unknown fingerprint"}
119
        self.assertEqual(resp_dict, exp_dict)
120

    
121
        # update
122
        resp = self.client.put(self.keys_url + "/1",
123
                               json.dumps({'name': 'key pair 1 new name'}),
124
                               content_type='application/json')
125

    
126
        pk = PublicKeyPair.objects.get(pk=1)
127
        self.assertEqual(pk.name, "key pair 1 new name")
128

    
129
        # delete
130
        resp = self.client.delete(self.keys_url + "/1")
131
        self.assertEqual(PublicKeyPair.objects.count(), 0)
132

    
133
        resp = self.client.get(self.keys_url + "/1")
134
        self.assertEqual(resp.status_code, 404)
135

    
136
        resp = self.client.get(self.keys_url)
137
        self.assertEqual(resp.content, "[]")
138

    
139
        # test rest create
140
        resp = self.client.post(self.keys_url,
141
                                json.dumps({'name': 'key pair 2',
142
                                            'content': """key 2 content"""}),
143
                                content_type='application/json')
144
        self.assertEqual(PublicKeyPair.objects.count(), 1)
145
        pk = PublicKeyPair.objects.get()
146
        self.assertEqual(pk.name, "key pair 2")
147
        self.assertEqual(pk.content, "key 2 content")
148

    
149
    def test_generate_views(self):
150
        import base64
151

    
152
        # just test that
153
        resp = self.client.post(self.keys_url + "/generate")
154
        self.assertNotEqual(resp, "")
155

    
156
        data = json.loads(resp.content)
157
        self.assertEqual('private' in data, True)
158
        self.assertEqual('private' in data, True)
159

    
160
        # public key is base64 encoded
161
        base64.b64decode(data['public'].replace("ssh-rsa ", ""))
162

    
163
        # remove header/footer
164
        private = "".join(data['private'].split("\n")[1:-1])
165

    
166
        # private key is base64 encoded
167
        base64.b64decode(private)
168

    
169
        new_key = PublicKeyPair()
170
        new_key.content = data['public']
171
        new_key.name = "new key"
172
        new_key.user = 'test'
173
        new_key.full_clean()
174
        new_key.save()
175

    
176
    def test_generate_limit(self):
177
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
178
        self.client.post(self.keys_url,
179
                         json.dumps({'name': 'key1',
180
                                     'content': """key 1 content"""}),
181
                         content_type='application/json')
182
        genpath = self.keys_url + "/generate"
183
        r = self.client.post(genpath)
184
        assert isinstance(r, http.HttpResponseServerError)
185

    
186
    def test_invalid_data(self):
187
        resp = self.client.post(self.keys_url,
188
                                json.dumps({'content': """key 2 content"""}),
189
                                content_type='application/json')
190

    
191
        self.assertEqual(resp.status_code, 500)
192
        self.assertEqual(resp.content,
193
                         """{"non_field_key": "__all__", "errors": """
194
                         """{"name": ["This field cannot be blank."]}}""")
195

    
196
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
197

    
198
        # test ssh limit
199
        resp = self.client.post(self.keys_url,
200
                                json.dumps({'name': 'key1',
201
                                            'content': """key 1 content"""}),
202
                                content_type='application/json')
203
        resp = self.client.post(self.keys_url,
204
                                json.dumps({'name': 'key1',
205
                                            'content': """key 1 content"""}),
206
                                content_type='application/json')
207
        resp = self.client.post(self.keys_url,
208
                                json.dumps({'name': 'key1',
209
                                            'content': """key 1 content"""}),
210
                                content_type='application/json')
211
        self.assertEqual(resp.status_code, 500)
212
        self.assertEqual(resp.content,
213
                         """{"non_field_key": "__all__", "errors": """
214
                         """{"__all__": ["SSH keys limit exceeded."]}}""")