Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ d495600d

History | View | Annotate | Download (8.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django import http
36
from django.test import TransactionTestCase
37
from django.conf import settings
38
from django.test.client import Client
39
from django.core.urlresolvers import clear_url_caches
40
from django.utils import simplejson as json
41
from django.conf import settings
42
from django.core.urlresolvers import reverse
43
from mock import patch
44

    
45
from synnefo.userdata.models import *
46

    
47

    
48
def get_user_mock(request, *args, **kwargs):
49
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
50
        request.user_uniq = 'test'
51
        request.user = {'id': 'id',
52
                        'username': 'username',
53
                        'uuid': 'test'}
54

    
55

    
56
class AaiClient(Client):
57

    
58
    def request(self, **request):
59
        # mock the astakos authentication function
60
        with patch("synnefo.userdata.rest.get_user",
61
                   new=get_user_mock):
62
            with patch("synnefo.userdata.views.get_user",
63
                       new=get_user_mock):
64
                request['HTTP_X_AUTH_TOKEN'] = '0000'
65
                return super(AaiClient, self).request(**request)
66

    
67

    
68
class TestRestViews(TransactionTestCase):
69

    
70
    fixtures = ['users']
71

    
72
    def setUp(self):
73
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10
74

    
75
        settings.SKIP_SSH_VALIDATION = True
76
        self.client = AaiClient()
77
        self.user = 'test'
78
        self.keys_url = reverse('ui_keys_collection')
79

    
80
    def test_keys_collection_get(self):
81
        resp = self.client.get(self.keys_url)
82
        self.assertEqual(resp.content, "[]")
83

    
84
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
85
                content="content1")
86

    
87
        resp = self.client.get(self.keys_url)
88
        resp_list = json.loads(resp.content);
89
        exp_list = [{"content": "content1", "id": 1,
90
                    "uri": self.keys_url + "/1", "name": "key pair 1",
91
                    "fingerprint": "unknown fingerprint"}]
92
        self.assertEqual(resp_list, exp_list)
93

    
94
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
95
                content="content2")
96

    
97
        resp = self.client.get(self.keys_url)
98
        resp_list = json.loads(resp.content)
99
        exp_list = [{"content": "content1", "id": 1,
100
                     "uri": self.keys_url + "/1", "name": "key pair 1",
101
                     "fingerprint": "unknown fingerprint"},
102
                    {"content": "content2", "id": 2,
103
                     "uri": self.keys_url + "/2",
104
                     "name": "key pair 2",
105
                     "fingerprint": "unknown fingerprint"}]
106

    
107
        self.assertEqual(resp_list, exp_list)
108

    
109
    def test_keys_resourse_get(self):
110
        resp = self.client.get(self.keys_url + "/1")
111
        self.assertEqual(resp.status_code, 404)
112

    
113
        # create a public key
114
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
115
                content="content1")
116
        resp = self.client.get(self.keys_url + "/1")
117
        resp_dict = json.loads(resp.content);
118
        exp_dict = {"content": "content1", "id": 1,
119
                    "uri": self.keys_url + "/1", "name": "key pair 1",
120
                    "fingerprint": "unknown fingerprint"}
121
        self.assertEqual(resp_dict, exp_dict)
122

    
123
        # update
124
        resp = self.client.put(self.keys_url + "/1",
125
                               json.dumps({'name':'key pair 1 new name'}),
126
                               content_type='application/json')
127

    
128
        pk = PublicKeyPair.objects.get(pk=1)
129
        self.assertEqual(pk.name, "key pair 1 new name")
130

    
131
        # delete
132
        resp = self.client.delete(self.keys_url + "/1")
133
        self.assertEqual(PublicKeyPair.objects.count(), 0)
134

    
135
        resp = self.client.get(self.keys_url + "/1")
136
        self.assertEqual(resp.status_code, 404)
137

    
138
        resp = self.client.get(self.keys_url)
139
        self.assertEqual(resp.content, "[]")
140

    
141
        # test rest create
142
        resp = self.client.post(self.keys_url,
143
                                json.dumps({'name':'key pair 2',
144
                                            'content':"""key 2 content"""}),
145
                                content_type='application/json')
146
        self.assertEqual(PublicKeyPair.objects.count(), 1)
147
        pk = PublicKeyPair.objects.get()
148
        self.assertEqual(pk.name, "key pair 2")
149
        self.assertEqual(pk.content, "key 2 content")
150

    
151
    def test_generate_views(self):
152
        import base64
153

    
154
        # just test that
155
        resp = self.client.post(self.keys_url + "/generate")
156
        self.assertNotEqual(resp, "")
157

    
158
        data = json.loads(resp.content)
159
        self.assertEqual(data.has_key('private'), True)
160
        self.assertEqual(data.has_key('private'), True)
161

    
162
        # public key is base64 encoded
163
        base64.b64decode(data['public'].replace("ssh-rsa ",""))
164

    
165
        # remove header/footer
166
        private = "".join(data['private'].split("\n")[1:-1])
167

    
168
        # private key is base64 encoded
169
        base64.b64decode(private)
170

    
171
        new_key = PublicKeyPair()
172
        new_key.content = data['public']
173
        new_key.name = "new key"
174
        new_key.user = 'test'
175
        new_key.full_clean()
176
        new_key.save()
177

    
178
    def test_generate_limit(self):
179
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
180
        resp = self.client.post(self.keys_url,
181
                                json.dumps({'name':'key1',
182
                                            'content':"""key 1 content"""}),
183
                                content_type='application/json')
184
        genpath = self.keys_url + "/generate"
185
        r = self.client.post(genpath)
186
        assert isinstance(r, http.HttpResponseServerError)
187

    
188
    def test_invalid_data(self):
189
        resp = self.client.post(self.keys_url,
190
                                json.dumps({'content':"""key 2 content"""}),
191
                                content_type='application/json')
192

    
193
        self.assertEqual(resp.status_code, 500)
194
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
195
                                       """{"name": ["This field cannot be blank."]}}""")
196

    
197
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
198

    
199
        # test ssh limit
200
        resp = self.client.post(self.keys_url,
201
                                json.dumps({'name':'key1',
202
                                            'content':"""key 1 content"""}),
203
                                content_type='application/json')
204
        resp = self.client.post(self.keys_url,
205
                                json.dumps({'name':'key1',
206
                                            'content':"""key 1 content"""}),
207
                                content_type='application/json')
208
        resp = self.client.post(self.keys_url,
209
                                json.dumps({'name':'key1',
210
                                            'content':"""key 1 content"""}),
211
                                content_type='application/json')
212
        self.assertEqual(resp.status_code, 500)
213
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
214
                                       """{"__all__": ["SSH keys limit exceeded."]}}""")
215