Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / helpdesk / tests.py @ bbea0414

History | View | Annotate | Download (14.1 kB)

1
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
import mock
36

    
37
from django.test import TestCase, Client
38
from django.conf import settings
39
from django.core.urlresolvers import reverse
40

    
41

    
42
USER1 = "5edcb5aa-1111-4146-a8ed-2b6287824353"
43
USER2 = "5edcb5aa-2222-4146-a8ed-2b6287824353"
44

    
45
USERS_UUIDS = {}
46
USERS_UUIDS[USER1] = {'displayname': 'testuser@test.com'}
47
USERS_UUIDS[USER2] = {'displayname': 'testuser2@test.com'}
48

    
49
USERS_DISPLAYNAMES = dict(map(lambda k: (k[1]['displayname'], {'uuid': k[0]}),
50
                          USERS_UUIDS.iteritems()))
51

    
52
from synnefo.db import models_factory as mfactory
53

    
54

    
55
class AstakosClientMock():
56
    def __init__(*args, **kwargs):
57
        pass
58

    
59
    def get_username(self, uuid):
60
        try:
61
            return USERS_UUIDS.get(uuid)['displayname']
62
        except TypeError:
63
            return None
64

    
65
    def get_uuid(self, display_name):
66
        try:
67
            return USERS_DISPLAYNAMES.get(display_name)['uuid']
68
        except TypeError:
69
            return None
70

    
71

    
72
class AuthClient(Client):
73

    
74
    def request(self, **request):
75
        token = request.pop('user_token', '0000')
76
        if token:
77
            request['HTTP_X_AUTH_TOKEN'] = token
78
        return super(AuthClient, self).request(**request)
79

    
80

    
81
def get_user_mock(request, *args, **kwargs):
82
    request.user_uniq = None
83
    request.user = None
84
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000':
85
        request.user_uniq = 'test'
86
        request.user = {"access": {
87
                        "token": {
88
                            "expires": "2013-06-19T15:23:59.975572+00:00",
89
                            "id": "0000",
90
                            "tenant": {
91
                                "id": "test",
92
                                "name": "Firstname Lastname"
93
                                }
94
                            },
95
                        "serviceCatalog": [],
96
                        "user": {
97
                            "roles_links": [],
98
                            "id": "test",
99
                            "roles": [{"id": 1, "name": "default"}],
100
                            "name": "Firstname Lastname"}}
101
                        }
102

    
103
    if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0001':
104
        request.user_uniq = 'test'
105
        request.user = {"access": {
106
                        "token": {
107
                            "expires": "2013-06-19T15:23:59.975572+00:00",
108
                            "id": "0001",
109
                            "tenant": {
110
                                "id": "test",
111
                                "name": "Firstname Lastname"
112
                                }
113
                            },
114
                        "serviceCatalog": [],
115
                        "user": {
116
                            "roles_links": [],
117
                            "id": "test",
118
                            "roles": [{"id": 1, "name": "default"},
119
                                      {"id": 2, "name": "helpdesk"}],
120
                            "name": "Firstname Lastname"}}
121
                        }
122

    
123

    
124
@mock.patch("astakosclient.AstakosClient", new=AstakosClientMock)
125
@mock.patch("snf_django.lib.astakos.get_user", new=get_user_mock)
126
class HelpdeskTests(TestCase):
127
    """
128
    Helpdesk tests. Test correctness of permissions and returned data.
129
    """
130

    
131
    def setUp(self):
132
        settings.SKIP_SSH_VALIDATION = True
133
        settings.HELPDESK_ENABLED = True
134
        self.client = AuthClient()
135

    
136
        # init models
137
        vm1u1 = mfactory.VirtualMachineFactory(userid=USER1, name="user1 vm",
138
                                               pk=1001)
139
        vm1u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm1",
140
                                               pk=1002)
141
        vm2u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm2",
142
                                               pk=1003)
143

    
144
        nic1 = mfactory.NetworkInterfaceFactory(machine=vm1u2,
145
                                                userid=vm1u2.userid,
146
                                                network__public=False,
147
                                                network__userid=USER1)
148
        ip2 = mfactory.IPv4AddressFactory(nic__machine=vm1u1,
149
                                          userid=vm1u1.userid,
150
                                          network__public=True,
151
                                          network__userid=None,
152
                                          address="195.251.222.211")
153
        mfactory.IPAddressLogFactory(address=ip2.address,
154
                                     server_id=vm1u1.id,
155
                                     network_id=ip2.network.id,
156
                                     active=True)
157

    
158
    def test_enabled_setting(self):
159
        settings.HELPDESK_ENABLED = False
160

    
161
        # helpdesk is disabled
162
        r = self.client.get(reverse('helpdesk-index'), user_token="0001")
163
        self.assertEqual(r.status_code, 404)
164
        r = self.client.get(reverse('helpdesk-details',
165
                                    args=['testuser@test.com']),
166
                            user_token="0001")
167
        self.assertEqual(r.status_code, 404)
168

    
169
    def test_ip_lookup(self):
170
        # ip does not exist, proper message gets displayed
171
        r = self.client.get(reverse('helpdesk-details',
172
                            args=["195.251.221.122"]), user_token='0001')
173
        self.assertFalse(r.context["ip_exists"])
174
        self.assertEqual(list(r.context["ips"]), [])
175

    
176
        # ip exists
177
        r = self.client.get(reverse('helpdesk-details',
178
                            args=["195.251.222.211"]), user_token='0001')
179
        self.assertTrue(r.context["ip_exists"])
180
        ips = r.context["ips"]
181
        for ip in ips:
182
            self.assertEqual(ip.address, "195.251.222.211")
183

    
184
    def test_vm_lookup(self):
185
        # vm id does not exist
186
        r = self.client.get(reverse('helpdesk-details',
187
                            args=["vm-123"]), user_token='0001')
188
        self.assertContains(r, 'User with Virtual Machine')
189

    
190
        # vm exists, 'test' account discovered
191
        r = self.client.get(reverse('helpdesk-details',
192
                            args=["vm1001"]), user_token='0001')
193
        self.assertEqual(r.context['account'], USER1)
194
        r = self.client.get(reverse('helpdesk-details',
195
                            args=["vm1002"]), user_token='0001')
196
        self.assertEqual(r.context['account'], USER2)
197
        # dash also works
198
        r = self.client.get(reverse('helpdesk-details',
199
                            args=["vm-1002"]), user_token='0001')
200
        self.assertEqual(r.context['account'], USER2)
201

    
202
    def test_view_permissions(self):
203
        # anonymous user gets 403
204
        r = self.client.get(reverse('helpdesk-index'), user_token=None)
205
        self.assertEqual(r.status_code, 403)
206
        r = self.client.get(reverse('helpdesk-details',
207
                                    args=['testuser@test.com']),
208
                            user_token=None)
209
        self.assertEqual(r.status_code, 403)
210

    
211
        # user not in helpdesk group gets 403
212
        r = self.client.get(reverse('helpdesk-index'))
213
        self.assertEqual(r.status_code, 403)
214
        r = self.client.get(reverse('helpdesk-details',
215
                                    args=['testuser@test.com']))
216
        self.assertEqual(r.status_code, 403)
217

    
218
        # user with helpdesk group gets 200
219
        r = self.client.get(reverse('helpdesk-index'), user_token="0001")
220
        self.assertEqual(r.status_code, 200)
221
        r = self.client.get(reverse('helpdesk-details',
222
                                    args=['testuser@test.com']),
223
                            user_token="0001")
224
        self.assertEqual(r.status_code, 200)
225

    
226
        r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,)))
227
        r = self.client.get(reverse('helpdesk-suspend-vm', args=(1001,)),
228
                            user_token="0001", data={'token': '1234'})
229
        self.assertEqual(r.status_code, 403)
230
        r = self.client.get(reverse('helpdesk-suspend-vm', args=(1001,)),
231
                            user_token="0001")
232
        self.assertEqual(r.status_code, 403)
233
        r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,)),
234
                             user_token="0001", data={'token': '0001'})
235
        self.assertEqual(r.status_code, 302)
236
        r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,)),
237
                             user_token="0000", data={'token': '0000'})
238
        self.assertEqual(r.status_code, 403)
239

    
240
    def test_suspend_vm(self):
241
        r = self.client.get(reverse('helpdesk-details',
242
                                    args=['testuser@test.com']),
243
                            user_token="0001")
244
        self.assertEqual(r.status_code, 200)
245
        vmid = r.context['vms'][0].pk
246
        r = self.client.post(reverse('helpdesk-suspend-vm', args=(vmid,)),
247
                             data={'token': '0001'}, user_token="0001")
248
        self.assertEqual(r.status_code, 302)
249

    
250
        r = self.client.get(reverse('helpdesk-details',
251
                                    args=['testuser@test.com']),
252
                            user_token="0001")
253
        self.assertTrue(r.context['vms'][0].suspended)
254

    
255
        r = self.client.post(reverse('helpdesk-suspend-vm-release',
256
                                     args=(vmid,)), data={'token': '0001'},
257
                             user_token="0001")
258
        self.assertEqual(r.status_code, 302)
259
        r = self.client.get(reverse('helpdesk-details',
260
                                    args=['testuser@test.com']),
261
                            user_token="0001")
262
        self.assertFalse(r.context['vms'][0].suspended)
263

    
264
    def test_results_get_filtered(self):
265
        """
266
        Test that view context data are filtered based on userid provided.
267
        Check helpdesk_test.json to see the existing database data.
268
        """
269

    
270
        # 'testuser@test.com' details, see
271
        # helpdes/fixtures/helpdesk_test.json for more details
272
        r = self.client.get(reverse('helpdesk-details',
273
                                    args=['testuser@test.com']),
274
                            user_token="0001")
275
        account = r.context['account']
276
        vms = r.context['vms']
277
        nets = r.context['networks']
278
        self.assertEqual(account, USER1)
279
        self.assertEqual(vms[0].name, "user1 vm")
280
        self.assertEqual(vms.count(), 1)
281
        self.assertEqual(len(nets), 2)
282
        self.assertEqual(r.context['account_exists'], True)
283

    
284
        # 'testuser2@test.com' details, see helpdesk
285
        # /fixtures/helpdesk_test.json for more details
286
        r = self.client.get(reverse('helpdesk-details',
287
                                    args=['testuser2@test.com']),
288
                            user_token="0001")
289
        account = r.context['account']
290
        vms = r.context['vms']
291
        nets = r.context['networks']
292
        self.assertEqual(account, USER2)
293
        self.assertEqual(vms.count(), 2)
294
        self.assertEqual(sorted([vms[0].name, vms[1].name]),
295
                         sorted(["user2 vm1", "user2 vm2"]))
296
        self.assertEqual(len(nets), 0)
297
        self.assertEqual(r.context['account_exists'], True)
298

    
299
        # 'testuser5@test.com' does not exist, should be redirected to
300
        # helpdesk home
301
        r = self.client.get(reverse('helpdesk-details',
302
                                    args=['testuser5@test.com']),
303
                            user_token="0001")
304
        vms = r.context['vms']
305
        self.assertEqual(r.context['account_exists'], False)
306
        self.assertEqual(vms.count(), 0)
307

    
308
    def test_start_shutdown(self):
309
        from synnefo.logic import backend
310

    
311
        self.vm1 = mfactory.VirtualMachineFactory(userid=USER1)
312
        pk = self.vm1.pk
313

    
314
        r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,)))
315
        self.assertEqual(r.status_code, 403)
316

    
317
        r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,)),
318
                             data={'token': '0001'})
319
        self.assertEqual(r.status_code, 403)
320

    
321
        backend.shutdown_instance = shutdown = mock.Mock()
322
        shutdown.return_value = 1
323
        self.vm1.operstate = 'STARTED'
324
        self.vm1.save()
325
        r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,)),
326
                             data={'token': '0001'}, user_token='0001')
327
        self.assertEqual(r.status_code, 302)
328
        self.assertTrue(shutdown.called)
329
        self.assertEqual(len(shutdown.mock_calls), 1)
330

    
331
        backend.startup_instance = startup = mock.Mock()
332
        startup.return_value = 2
333
        self.vm1.operstate = 'STOPPED'
334
        self.vm1.save()
335
        r = self.client.post(reverse('helpdesk-vm-start', args=(pk,)),
336
                             data={'token': '0001'}, user_token='0001')
337
        self.assertEqual(r.status_code, 302)
338
        self.assertTrue(startup.called)
339
        self.assertEqual(len(startup.mock_calls), 1)