Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / admin-interface / tests.py @ c204fcff

History | View | Annotate | Download (14.5 kB)

1
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
import mock
36

    
37
from django.test import TestCase, Client
38
from django.conf import settings
39
from django.core.urlresolvers import reverse
40
from snf_django.utils.testing import mocked_quotaholder
41

    
42

    
43
USER1 = "5edcb5aa-1111-4146-a8ed-2b6287824353"
44
USER2 = "5edcb5aa-2222-4146-a8ed-2b6287824353"
45

    
46
USERS_UUIDS = {}
47
USERS_UUIDS[USER1] = {'displayname': 'testuser@test.com'}
48
USERS_UUIDS[USER2] = {'displayname': 'testuser2@test.com'}
49

    
50
USERS_DISPLAYNAMES = dict(map(lambda k: (k[1]['displayname'], {'uuid': k[0]}),
51
                          USERS_UUIDS.iteritems()))
52

    
53
from synnefo.db import models_factory as mfactory
54

    
55

    
56
class AstakosClientMock():
57
    def __init__(*args, **kwargs):
58
        pass
59

    
60
    def get_username(self, uuid):
61
        try:
62
            return USERS_UUIDS.get(uuid)['displayname']
63
        except TypeError:
64
            return None
65

    
66
    def get_uuid(self, display_name):
67
        try:
68
            return USERS_DISPLAYNAMES.get(display_name)['uuid']
69
        except TypeError:
70
            return None
71

    
72

    
73
class AuthClient(Client):
74

    
75
    def request(self, **request):
76
        token = request.pop('user_token', '0000')
77
        if token:
78
            request['HTTP_X_AUTH_TOKEN'] = token
79
        return super(AuthClient, self).request(**request)
80

    
81

    
82
def get_user_mock(request, *args, **kwargs):
83
    request.user_uniq = None
84
    request.user = None
85
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
86
        request.user_uniq = 'test'
87
        request.user = {"access": {
88
                        "token": {
89
                            "expires": "2013-06-19T15:23:59.975572+00:00",
90
                            "id": "0000",
91
                            "tenant": {
92
                                "id": "test",
93
                                "name": "Firstname Lastname"
94
                                }
95
                            },
96
                        "serviceCatalog": [],
97
                        "user": {
98
                            "roles_links": [],
99
                            "id": "test",
100
                            "roles": [{"id": 1, "name": "default"}],
101
                            "name": "Firstname Lastname"}}
102
                        }
103

    
104
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0001':
105
        request.user_uniq = 'test'
106
        request.user = {"access": {
107
                        "token": {
108
                            "expires": "2013-06-19T15:23:59.975572+00:00",
109
                            "id": "0001",
110
                            "tenant": {
111
                                "id": "test",
112
                                "name": "Firstname Lastname"
113
                                }
114
                            },
115
                        "serviceCatalog": [],
116
                        "user": {
117
                            "roles_links": [],
118
                            "id": "test",
119
                            "roles": [{"id": 1, "name": "default"},
120
                                      {"id": 2, "name": "admin-interface"}],
121
                            "name": "Firstname Lastname"}}
122
                        }
123

    
124

    
125
@mock.patch("astakosclient.AstakosClient", new=AstakosClientMock)
126
@mock.patch("snf_django.lib.astakos.get_user", new=get_user_mock)
127
class Admin-InterfaceTests(TestCase):
128
    """
129
    Admin-Interface tests. Test correctness of permissions and returned data.
130
    """
131

    
132
    def setUp(self):
133
        settings.SKIP_SSH_VALIDATION = True
134
        settings.ADMIN-INTERFACE_ENABLED = True
135
        self.client = AuthClient()
136

    
137
        # init models
138
        vm1u1 = mfactory.VirtualMachineFactory(userid=USER1, name="user1 vm",
139
                                               pk=1001)
140
        vm1u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm1",
141
                                               pk=1002)
142
        vm2u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm2",
143
                                               pk=1003)
144

    
145
        nic1 = mfactory.NetworkInterfaceFactory(machine=vm1u2,
146
                                                userid=vm1u2.userid,
147
                                                network__public=False,
148
                                                network__userid=USER1)
149
        ip2 = mfactory.IPv4AddressFactory(nic__machine=vm1u1,
150
                                          userid=vm1u1.userid,
151
                                          network__public=True,
152
                                          network__userid=None,
153
                                          address="195.251.222.211")
154
        mfactory.IPAddressLogFactory(address=ip2.address,
155
                                     server_id=vm1u1.id,
156
                                     network_id=ip2.network.id,
157
                                     active=True)
158

    
159
    def test_enabled_setting(self):
160
        settings.ADMIN-INTERFACE_ENABLED = False
161

    
162
        # admin-interface is disabled
163
        r = self.client.get(reverse('admin-interface-index'), user_token="0001")
164
        self.assertEqual(r.status_code, 404)
165
        r = self.client.get(reverse('admin-interface-details',
166
                                    args=['testuser@test.com']),
167
                            user_token="0001")
168
        self.assertEqual(r.status_code, 404)
169

    
170
    def test_ip_lookup(self):
171
        # ip does not exist, proper message gets displayed
172
        r = self.client.get(reverse('admin-interface-details',
173
                            args=["195.251.221.122"]), user_token='0001')
174
        self.assertFalse(r.context["ip_exists"])
175
        self.assertEqual(list(r.context["ips"]), [])
176

    
177
        # ip exists
178
        r = self.client.get(reverse('admin-interface-details',
179
                            args=["195.251.222.211"]), user_token='0001')
180
        self.assertTrue(r.context["ip_exists"])
181
        ips = r.context["ips"]
182
        for ip in ips:
183
            self.assertEqual(ip.address, "195.251.222.211")
184

    
185
    def test_vm_lookup(self):
186
        # vm id does not exist
187
        r = self.client.get(reverse('admin-interface-details',
188
                            args=["vm-123"]), user_token='0001')
189
        self.assertContains(r, 'User with Virtual Machine')
190

    
191
        # vm exists, 'test' account discovered
192
        r = self.client.get(reverse('admin-interface-details',
193
                            args=["vm1001"]), user_token='0001')
194
        self.assertEqual(r.context['account'], USER1)
195
        r = self.client.get(reverse('admin-interface-details',
196
                            args=["vm1002"]), user_token='0001')
197
        self.assertEqual(r.context['account'], USER2)
198
        # dash also works
199
        r = self.client.get(reverse('admin-interface-details',
200
                            args=["vm-1002"]), user_token='0001')
201
        self.assertEqual(r.context['account'], USER2)
202

    
203
    def test_view_permissions(self):
204
        # anonymous user gets 403
205
        r = self.client.get(reverse('admin-interface-index'), user_token=None)
206
        self.assertEqual(r.status_code, 403)
207
        r = self.client.get(reverse('admin-interface-details',
208
                                    args=['testuser@test.com']),
209
                            user_token=None)
210
        self.assertEqual(r.status_code, 403)
211

    
212
        # user not in admin-interface group gets 403
213
        r = self.client.get(reverse('admin-interface-index'))
214
        self.assertEqual(r.status_code, 403)
215
        r = self.client.get(reverse('admin-interface-details',
216
                                    args=['testuser@test.com']))
217
        self.assertEqual(r.status_code, 403)
218

    
219
        # user with admin-interface group gets 200
220
        r = self.client.get(reverse('admin-interface-index'), user_token="0001")
221
        self.assertEqual(r.status_code, 200)
222
        r = self.client.get(reverse('admin-interface-details',
223
                                    args=['testuser@test.com']),
224
                            user_token="0001")
225
        self.assertEqual(r.status_code, 200)
226

    
227
        r = self.client.post(reverse('admin-interface-suspend-vm', args=(1001,)))
228
        r = self.client.get(reverse('admin-interface-suspend-vm', args=(1001,)),
229
                            user_token="0001", data={'token': '1234'})
230
        self.assertEqual(r.status_code, 403)
231
        r = self.client.get(reverse('admin-interface-suspend-vm', args=(1001,)),
232
                            user_token="0001")
233
        self.assertEqual(r.status_code, 403)
234
        r = self.client.post(reverse('admin-interface-suspend-vm', args=(1001,)),
235
                             user_token="0001", data={'token': '0001'})
236
        self.assertEqual(r.status_code, 302)
237
        r = self.client.post(reverse('admin-interface-suspend-vm', args=(1001,)),
238
                             user_token="0000", data={'token': '0000'})
239
        self.assertEqual(r.status_code, 403)
240

    
241
    def test_suspend_vm(self):
242
        r = self.client.get(reverse('admin-interface-details',
243
                                    args=['testuser@test.com']),
244
                            user_token="0001")
245
        self.assertEqual(r.status_code, 200)
246
        vmid = r.context['vms'][0].pk
247
        r = self.client.post(reverse('admin-interface-suspend-vm', args=(vmid,)),
248
                             data={'token': '0001'}, user_token="0001")
249
        self.assertEqual(r.status_code, 302)
250

    
251
        r = self.client.get(reverse('admin-interface-details',
252
                                    args=['testuser@test.com']),
253
                            user_token="0001")
254
        self.assertTrue(r.context['vms'][0].suspended)
255

    
256
        r = self.client.post(reverse('admin-interface-suspend-vm-release',
257
                                     args=(vmid,)), data={'token': '0001'},
258
                             user_token="0001")
259
        self.assertEqual(r.status_code, 302)
260
        r = self.client.get(reverse('admin-interface-details',
261
                                    args=['testuser@test.com']),
262
                            user_token="0001")
263
        self.assertFalse(r.context['vms'][0].suspended)
264

    
265
    def test_results_get_filtered(self):
266
        """
267
        Test that view context data are filtered based on userid provided.
268
        Check admin-interface_test.json to see the existing database data.
269
        """
270

    
271
        # 'testuser@test.com' details, see
272
        # helpdes/fixtures/admin-interface_test.json for more details
273
        r = self.client.get(reverse('admin-interface-details',
274
                                    args=['testuser@test.com']),
275
                            user_token="0001")
276
        account = r.context['account']
277
        vms = r.context['vms']
278
        nets = r.context['networks']
279
        self.assertEqual(account, USER1)
280
        self.assertEqual(vms[0].name, "user1 vm")
281
        self.assertEqual(vms.count(), 1)
282
        self.assertEqual(len(nets), 2)
283
        self.assertEqual(r.context['account_exists'], True)
284

    
285
        # 'testuser2@test.com' details, see admin-interface
286
        # /fixtures/admin-interface_test.json for more details
287
        r = self.client.get(reverse('admin-interface-details',
288
                                    args=['testuser2@test.com']),
289
                            user_token="0001")
290
        account = r.context['account']
291
        vms = r.context['vms']
292
        nets = r.context['networks']
293
        self.assertEqual(account, USER2)
294
        self.assertEqual(vms.count(), 2)
295
        self.assertEqual(sorted([vms[0].name, vms[1].name]),
296
                         sorted(["user2 vm1", "user2 vm2"]))
297
        self.assertEqual(len(nets), 0)
298
        self.assertEqual(r.context['account_exists'], True)
299

    
300
        # 'testuser5@test.com' does not exist, should be redirected to
301
        # admin-interface home
302
        r = self.client.get(reverse('admin-interface-details',
303
                                    args=['testuser5@test.com']),
304
                            user_token="0001")
305
        vms = r.context['vms']
306
        self.assertEqual(r.context['account_exists'], False)
307
        self.assertEqual(vms.count(), 0)
308

    
309
    def test_start_shutdown(self):
310
        from synnefo.logic import backend
311

    
312
        self.vm1 = mfactory.VirtualMachineFactory(userid=USER1)
313
        pk = self.vm1.pk
314

    
315
        r = self.client.post(reverse('admin-interface-vm-shutdown', args=(pk,)))
316
        self.assertEqual(r.status_code, 403)
317

    
318
        r = self.client.post(reverse('admin-interface-vm-shutdown', args=(pk,)),
319
                             data={'token': '0001'})
320
        self.assertEqual(r.status_code, 403)
321

    
322
        backend.shutdown_instance = shutdown = mock.Mock()
323
        shutdown.return_value = 1
324
        self.vm1.operstate = 'STARTED'
325
        self.vm1.save()
326
        with mocked_quotaholder():
327
            r = self.client.post(reverse('admin-interface-vm-shutdown', args=(pk,)),
328
                                 data={'token': '0001'}, user_token='0001')
329
        self.assertEqual(r.status_code, 302)
330
        self.assertTrue(shutdown.called)
331
        self.assertEqual(len(shutdown.mock_calls), 1)
332

    
333
        backend.startup_instance = startup = mock.Mock()
334
        startup.return_value = 2
335
        self.vm1.operstate = 'STOPPED'
336
        self.vm1.save()
337
        with mocked_quotaholder():
338
            r = self.client.post(reverse('admin-interface-vm-start', args=(pk,)),
339
                                 data={'token': '0001'}, user_token='0001')
340
        self.assertEqual(r.status_code, 302)
341
        self.assertTrue(startup.called)
342
        self.assertEqual(len(startup.mock_calls), 1)