Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / userdata / tests.py @ cddc2b2f

History | View | Annotate | Download (7.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
from django.test import TransactionTestCase
36
from django.conf import settings
37
from django.test.client import Client
38
from django.core.urlresolvers import clear_url_caches
39
from django.utils import simplejson as json
40
from django.conf import settings
41
from django.core.urlresolvers import reverse
42

    
43
from synnefo.userdata.models import *
44

    
45

    
46
class AaiClient(Client):
47

    
48
    def request(self, **request):
49
        request['HTTP_X_AUTH_TOKEN'] = '0000'
50
        return super(AaiClient, self).request(**request)
51

    
52
class TestRestViews(TransactionTestCase):
53

    
54
    fixtures = ['users']
55

    
56
    def setUp(self):
57
        def get_user_mock(request, *Args, **kwargs):
58
            if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
59
                request.user_uniq = 'test'
60
                request.user = {'uniq': 'test'}
61

    
62
        # mock the astakos authentication function
63
        from snf_django.lib import astakos
64
        astakos.get_user = get_user_mock
65

    
66
        settings.SKIP_SSH_VALIDATION = True
67
        self.client = AaiClient()
68
        self.user = 'test'
69
        self.keys_url = reverse('ui_keys_collection')
70

    
71
    def test_keys_collection_get(self):
72
        resp = self.client.get(self.keys_url)
73
        self.assertEqual(resp.content, "[]")
74

    
75
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
76
                content="content1")
77

    
78
        resp = self.client.get(self.keys_url)
79
        resp_list = json.loads(resp.content);
80
        exp_list = [{"content": "content1", "id": 1,
81
                    "uri": self.keys_url + "/1", "name": "key pair 1",
82
                    "fingerprint": "unknown fingerprint"}]
83
        self.assertEqual(resp_list, exp_list)
84

    
85
        PublicKeyPair.objects.create(user=self.user, name="key pair 2",
86
                content="content2")
87

    
88
        resp = self.client.get(self.keys_url)
89
        resp_list = json.loads(resp.content)
90
        exp_list = [{"content": "content1", "id": 1,
91
                     "uri": self.keys_url + "/1", "name": "key pair 1",
92
                     "fingerprint": "unknown fingerprint"},
93
                    {"content": "content2", "id": 2,
94
                     "uri": self.keys_url + "/2",
95
                     "name": "key pair 2",
96
                     "fingerprint": "unknown fingerprint"}]
97

    
98
        self.assertEqual(resp_list, exp_list)
99

    
100
    def test_keys_resourse_get(self):
101
        resp = self.client.get(self.keys_url + "/1")
102
        self.assertEqual(resp.status_code, 404)
103

    
104
        # create a public key
105
        PublicKeyPair.objects.create(user=self.user, name="key pair 1",
106
                content="content1")
107
        resp = self.client.get(self.keys_url + "/1")
108
        resp_dict = json.loads(resp.content);
109
        exp_dict = {"content": "content1", "id": 1,
110
                    "uri": self.keys_url + "/1", "name": "key pair 1",
111
                    "fingerprint": "unknown fingerprint"}
112
        self.assertEqual(resp_dict, exp_dict)
113

    
114
        # update
115
        resp = self.client.put(self.keys_url + "/1",
116
                               json.dumps({'name':'key pair 1 new name'}),
117
                               content_type='application/json')
118

    
119
        pk = PublicKeyPair.objects.get(pk=1)
120
        self.assertEqual(pk.name, "key pair 1 new name")
121

    
122
        # delete
123
        resp = self.client.delete(self.keys_url + "/1")
124
        self.assertEqual(PublicKeyPair.objects.count(), 0)
125

    
126
        resp = self.client.get(self.keys_url + "/1")
127
        self.assertEqual(resp.status_code, 404)
128

    
129
        resp = self.client.get(self.keys_url)
130
        self.assertEqual(resp.content, "[]")
131

    
132
        # test rest create
133
        resp = self.client.post(self.keys_url,
134
                                json.dumps({'name':'key pair 2',
135
                                            'content':"""key 2 content"""}),
136
                                content_type='application/json')
137
        self.assertEqual(PublicKeyPair.objects.count(), 1)
138
        pk = PublicKeyPair.objects.get(pk=1)
139
        self.assertEqual(pk.name, "key pair 2")
140
        self.assertEqual(pk.content, "key 2 content")
141

    
142
    def test_generate_views(self):
143
        import base64
144

    
145
        # just test that
146
        resp = self.client.post(self.keys_url + "/generate")
147
        self.assertNotEqual(resp, "")
148

    
149
        data = json.loads(resp.content)
150
        self.assertEqual(data.has_key('private'), True)
151
        self.assertEqual(data.has_key('private'), True)
152

    
153
        # public key is base64 encoded
154
        base64.b64decode(data['public'].replace("ssh-rsa ",""))
155

    
156
        # remove header/footer
157
        private = "".join(data['private'].split("\n")[1:-1])
158

    
159
        # private key is base64 encoded
160
        base64.b64decode(private)
161

    
162
        new_key = PublicKeyPair()
163
        new_key.content = data['public']
164
        new_key.name = "new key"
165
        new_key.user = 'test'
166
        new_key.full_clean()
167
        new_key.save()
168

    
169
    def test_invalid_data(self):
170
        resp = self.client.post(self.keys_url,
171
                                json.dumps({'content':"""key 2 content"""}),
172
                                content_type='application/json')
173

    
174
        self.assertEqual(resp.status_code, 500)
175
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
176
                                       """{"name": ["This field cannot be blank."]}}""")
177

    
178
        settings.USERDATA_MAX_SSH_KEYS_PER_USER = 2
179

    
180
        # test ssh limit
181
        resp = self.client.post(self.keys_url,
182
                                json.dumps({'name':'key1',
183
                                            'content':"""key 1 content"""}),
184
                                content_type='application/json')
185
        resp = self.client.post(self.keys_url,
186
                                json.dumps({'name':'key1',
187
                                            'content':"""key 1 content"""}),
188
                                content_type='application/json')
189
        resp = self.client.post(self.keys_url,
190
                                json.dumps({'name':'key1',
191
                                            'content':"""key 1 content"""}),
192
                                content_type='application/json')
193
        self.assertEqual(resp.status_code, 500)
194
        self.assertEqual(resp.content, """{"non_field_key": "__all__", "errors": """
195
                                       """{"__all__": ["SSH keys limit exceeded."]}}""")
196