Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ b4b82ec4

History | View | Annotate | Download (9 kB)

1
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django import http
36
from django.test import TransactionTestCase
37
from django.conf import settings
38
from django.test.client import Client
39
from django.utils import simplejson as json
40
from django.core.urlresolvers import reverse
41
from mock import patch
42

    
43
from synnefo.userdata.models import PublicKeyPair
44

    
45

    
46
def get_user_mock(request, *args, **kwargs):
47
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
48
        request.user_uniq = 'test'
49
        request.user = {"access": {
50
                        "token": {
51
                            "expires": "2013-06-19T15:23:59.975572+00:00",
52
                            "id": "token",
53
                            "tenant": {
54
                                "id": "test",
55
                                "name": "Firstname Lastname"
56
                                }
57
                            },
58
                        "serviceCatalog": [],
59
                        "user": {
60
                            "roles_links": [],
61
                            "id": "test",
62
                            "roles": [{"id": 1, "name": "default"}],
63
                            "name": "Firstname Lastname"}}
64
                        }
65

    
66

    
67
class AaiClient(Client):
68

    
69
    def request(self, **request):
70
        # mock the astakos authentication function
71
        with patch("synnefo.userdata.rest.get_user",
72
                   new=get_user_mock):
73
            with patch("synnefo.userdata.views.get_user",
74
                       new=get_user_mock):
75
                request['HTTP_X_AUTH_TOKEN'] = '0000'
76
                return super(AaiClient, self).request(**request)
77

    
78

    
79
class TestRestViews(TransactionTestCase):
80

    
81
    fixtures = ['users']
82

    
83
    def setUp(self):
84
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 10
85

    
86
        settings.SKIP_SSH_VALIDATION = True
87
        self.client = AaiClient()
88
        self.user = 'test'
89
        self.keys_url = reverse('ui_keys_collection')
90

    
91
    def test_keys_collection_get(self):
92
        resp = self.client.get(self.keys_url)
93
        self.assertEqual(resp.content, "[]")
94

    
95
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
96
                                     content="content1")
97

    
98
        resp = self.client.get(self.keys_url)
99
        resp_list = json.loads(resp.content)
100
        exp_list = [{"content": "content1", "id": 1,
101
                     "uri": self.keys_url + "/1", "name": "key pair 1",
102
                     "fingerprint": "unknown fingerprint"}]
103
        self.assertEqual(resp_list, exp_list)
104

    
105
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
106
                                     content="content2")
107

    
108
        resp = self.client.get(self.keys_url)
109
        resp_list = json.loads(resp.content)
110
        exp_list = [{"content": "content1", "id": 1,
111
                     "uri": self.keys_url + "/1", "name": "key pair 1",
112
                     "fingerprint": "unknown fingerprint"},
113
                    {"content": "content2", "id": 2,
114
                     "uri": self.keys_url + "/2",
115
                     "name": "key pair 2",
116
                     "fingerprint": "unknown fingerprint"}]
117

    
118
        self.assertEqual(resp_list, exp_list)
119

    
120
    def test_keys_resourse_get(self):
121
        resp = self.client.get(self.keys_url + "/1")
122
        self.assertEqual(resp.status_code, 404)
123

    
124
        # create a public key
125
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
126
                                     content="content1")
127
        resp = self.client.get(self.keys_url + "/1")
128
        resp_dict = json.loads(resp.content)
129
        exp_dict = {"content": "content1", "id": 1,
130
                    "uri": self.keys_url + "/1", "name": "key pair 1",
131
                    "fingerprint": "unknown fingerprint"}
132
        self.assertEqual(resp_dict, exp_dict)
133

    
134
        # update
135
        resp = self.client.put(self.keys_url + "/1",
136
                               json.dumps({'name': 'key pair 1 new name'}),
137
                               content_type='application/json')
138

    
139
        pk = PublicKeyPair.objects.get(pk=1)
140
        self.assertEqual(pk.name, "key pair 1 new name")
141

    
142
        # delete
143
        resp = self.client.delete(self.keys_url + "/1")
144
        self.assertEqual(PublicKeyPair.objects.count(), 0)
145

    
146
        resp = self.client.get(self.keys_url + "/1")
147
        self.assertEqual(resp.status_code, 404)
148

    
149
        resp = self.client.get(self.keys_url)
150
        self.assertEqual(resp.content, "[]")
151

    
152
        # test rest create
153
        resp = self.client.post(self.keys_url,
154
                                json.dumps({'name': 'key pair 2',
155
                                            'content': """key 2 content"""}),
156
                                content_type='application/json')
157
        self.assertEqual(PublicKeyPair.objects.count(), 1)
158
        pk = PublicKeyPair.objects.get()
159
        self.assertEqual(pk.name, "key pair 2")
160
        self.assertEqual(pk.content, "key 2 content")
161

    
162
    def test_generate_views(self):
163
        import base64
164

    
165
        # just test that
166
        resp = self.client.post(self.keys_url + "/generate")
167
        self.assertNotEqual(resp, "")
168

    
169
        data = json.loads(resp.content)
170
        self.assertEqual('private' in data, True)
171
        self.assertEqual('private' in data, True)
172

    
173
        # public key is base64 encoded
174
        base64.b64decode(data['public'].replace("ssh-rsa ", ""))
175

    
176
        # remove header/footer
177
        private = "".join(data['private'].split("\n")[1:-1])
178

    
179
        # private key is base64 encoded
180
        base64.b64decode(private)
181

    
182
        new_key = PublicKeyPair()
183
        new_key.content = data['public']
184
        new_key.name = "new key"
185
        new_key.user = 'test'
186
        new_key.full_clean()
187
        new_key.save()
188

    
189
    def test_generate_limit(self):
190
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 1
191
        self.client.post(self.keys_url,
192
                         json.dumps({'name': 'key1',
193
                                     'content': """key 1 content"""}),
194
                         content_type='application/json')
195
        genpath = self.keys_url + "/generate"
196
        r = self.client.post(genpath)
197
        assert isinstance(r, http.HttpResponseServerError)
198

    
199
    def test_invalid_data(self):
200
        resp = self.client.post(self.keys_url,
201
                                json.dumps({'content': """key 2 content"""}),
202
                                content_type='application/json')
203

    
204
        self.assertEqual(resp.status_code, 500)
205
        self.assertEqual(resp.content,
206
                         """{"non_field_key": "__all__", "errors": """
207
                         """{"name": ["This field cannot be blank."]}}""")
208

    
209
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
210

    
211
        # test ssh limit
212
        resp = self.client.post(self.keys_url,
213
                                json.dumps({'name': 'key1',
214
                                            'content': """key 1 content"""}),
215
                                content_type='application/json')
216
        resp = self.client.post(self.keys_url,
217
                                json.dumps({'name': 'key1',
218
                                            'content': """key 1 content"""}),
219
                                content_type='application/json')
220
        resp = self.client.post(self.keys_url,
221
                                json.dumps({'name': 'key1',
222
                                            'content': """key 1 content"""}),
223
                                content_type='application/json')
224
        self.assertEqual(resp.status_code, 500)
225
        self.assertEqual(resp.content,
226
                         """{"non_field_key": "__all__", "errors": """
227
                         """{"__all__": ["SSH keys limit exceeded."]}}""")