root / snf-cyclades-app / synnefo / helpdesk / tests.py @ 8c7f5877
History | View | Annotate | Download (14.2 kB)
1 |
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
#
|
34 |
|
35 |
import mock |
36 |
|
37 |
from django.test import TestCase, Client |
38 |
from django.conf import settings |
39 |
from django.core.urlresolvers import reverse |
40 |
from snf_django.utils.testing import mocked_quotaholder |
41 |
|
42 |
|
43 |
USER1 = "5edcb5aa-1111-4146-a8ed-2b6287824353"
|
44 |
USER2 = "5edcb5aa-2222-4146-a8ed-2b6287824353"
|
45 |
|
46 |
USERS_UUIDS = {} |
47 |
USERS_UUIDS[USER1] = {'displayname': 'testuser@test.com'} |
48 |
USERS_UUIDS[USER2] = {'displayname': 'testuser2@test.com'} |
49 |
|
50 |
USERS_DISPLAYNAMES = dict(map(lambda k: (k[1]['displayname'], {'uuid': k[0]}), |
51 |
USERS_UUIDS.iteritems())) |
52 |
|
53 |
from synnefo.db import models_factory as mfactory |
54 |
|
55 |
|
56 |
class AstakosClientMock(): |
57 |
def __init__(*args, **kwargs): |
58 |
pass
|
59 |
|
60 |
def get_username(self, uuid): |
61 |
try:
|
62 |
return USERS_UUIDS.get(uuid)['displayname'] |
63 |
except TypeError: |
64 |
return None |
65 |
|
66 |
def get_uuid(self, display_name): |
67 |
try:
|
68 |
return USERS_DISPLAYNAMES.get(display_name)['uuid'] |
69 |
except TypeError: |
70 |
return None |
71 |
|
72 |
|
73 |
class AuthClient(Client): |
74 |
|
75 |
def request(self, **request): |
76 |
token = request.pop('user_token', '0000') |
77 |
if token:
|
78 |
request['HTTP_X_AUTH_TOKEN'] = token
|
79 |
return super(AuthClient, self).request(**request) |
80 |
|
81 |
|
82 |
def get_user_mock(request, *args, **kwargs): |
83 |
request.user_uniq = None
|
84 |
request.user = None
|
85 |
if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0000': |
86 |
request.user_uniq = 'test'
|
87 |
request.user = {"access": {
|
88 |
"token": {
|
89 |
"expires": "2013-06-19T15:23:59.975572+00:00", |
90 |
"id": "0000", |
91 |
"tenant": {
|
92 |
"id": "test", |
93 |
"name": "Firstname Lastname" |
94 |
} |
95 |
}, |
96 |
"serviceCatalog": [],
|
97 |
"user": {
|
98 |
"roles_links": [],
|
99 |
"id": "test", |
100 |
"roles": [{"id": 1, "name": "default"}], |
101 |
"name": "Firstname Lastname"}} |
102 |
} |
103 |
|
104 |
if request.META.get('HTTP_X_AUTH_TOKEN', None) == '0001': |
105 |
request.user_uniq = 'test'
|
106 |
request.user = {"access": {
|
107 |
"token": {
|
108 |
"expires": "2013-06-19T15:23:59.975572+00:00", |
109 |
"id": "0001", |
110 |
"tenant": {
|
111 |
"id": "test", |
112 |
"name": "Firstname Lastname" |
113 |
} |
114 |
}, |
115 |
"serviceCatalog": [],
|
116 |
"user": {
|
117 |
"roles_links": [],
|
118 |
"id": "test", |
119 |
"roles": [{"id": 1, "name": "default"}, |
120 |
{"id": 2, "name": "helpdesk"}], |
121 |
"name": "Firstname Lastname"}} |
122 |
} |
123 |
|
124 |
|
125 |
@mock.patch("astakosclient.AstakosClient", new=AstakosClientMock) |
126 |
@mock.patch("snf_django.lib.astakos.get_user", new=get_user_mock) |
127 |
class HelpdeskTests(TestCase): |
128 |
"""
|
129 |
Helpdesk tests. Test correctness of permissions and returned data.
|
130 |
"""
|
131 |
|
132 |
def setUp(self): |
133 |
settings.SKIP_SSH_VALIDATION = True
|
134 |
settings.HELPDESK_ENABLED = True
|
135 |
self.client = AuthClient()
|
136 |
|
137 |
# init models
|
138 |
vm1u1 = mfactory.VirtualMachineFactory(userid=USER1, name="user1 vm",
|
139 |
pk=1001)
|
140 |
vm1u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm1",
|
141 |
pk=1002)
|
142 |
vm2u2 = mfactory.VirtualMachineFactory(userid=USER2, name="user2 vm2",
|
143 |
pk=1003)
|
144 |
|
145 |
nic1 = mfactory.NetworkInterfaceFactory(machine=vm1u2, |
146 |
userid=vm1u2.userid, |
147 |
network__public=False,
|
148 |
network__userid=USER1) |
149 |
ip2 = mfactory.IPv4AddressFactory(nic__machine=vm1u1, |
150 |
userid=vm1u1.userid, |
151 |
network__public=True,
|
152 |
network__userid=None,
|
153 |
address="195.251.222.211")
|
154 |
mfactory.IPAddressLogFactory(address=ip2.address, |
155 |
server_id=vm1u1.id, |
156 |
network_id=ip2.network.id, |
157 |
active=True)
|
158 |
|
159 |
def test_enabled_setting(self): |
160 |
settings.HELPDESK_ENABLED = False
|
161 |
|
162 |
# helpdesk is disabled
|
163 |
r = self.client.get(reverse('helpdesk-index'), user_token="0001") |
164 |
self.assertEqual(r.status_code, 404) |
165 |
r = self.client.get(reverse('helpdesk-details', |
166 |
args=['testuser@test.com']),
|
167 |
user_token="0001")
|
168 |
self.assertEqual(r.status_code, 404) |
169 |
|
170 |
def test_ip_lookup(self): |
171 |
# ip does not exist, proper message gets displayed
|
172 |
r = self.client.get(reverse('helpdesk-details', |
173 |
args=["195.251.221.122"]), user_token='0001') |
174 |
self.assertFalse(r.context["ip_exists"]) |
175 |
self.assertEqual(list(r.context["ips"]), []) |
176 |
|
177 |
# ip exists
|
178 |
r = self.client.get(reverse('helpdesk-details', |
179 |
args=["195.251.222.211"]), user_token='0001') |
180 |
self.assertTrue(r.context["ip_exists"]) |
181 |
ips = r.context["ips"]
|
182 |
for ip in ips: |
183 |
self.assertEqual(ip.address, "195.251.222.211") |
184 |
|
185 |
def test_vm_lookup(self): |
186 |
# vm id does not exist
|
187 |
r = self.client.get(reverse('helpdesk-details', |
188 |
args=["vm-123"]), user_token='0001') |
189 |
self.assertContains(r, 'User with Virtual Machine') |
190 |
|
191 |
# vm exists, 'test' account discovered
|
192 |
r = self.client.get(reverse('helpdesk-details', |
193 |
args=["vm1001"]), user_token='0001') |
194 |
self.assertEqual(r.context['account'], USER1) |
195 |
r = self.client.get(reverse('helpdesk-details', |
196 |
args=["vm1002"]), user_token='0001') |
197 |
self.assertEqual(r.context['account'], USER2) |
198 |
# dash also works
|
199 |
r = self.client.get(reverse('helpdesk-details', |
200 |
args=["vm-1002"]), user_token='0001') |
201 |
self.assertEqual(r.context['account'], USER2) |
202 |
|
203 |
def test_view_permissions(self): |
204 |
# anonymous user gets 403
|
205 |
r = self.client.get(reverse('helpdesk-index'), user_token=None) |
206 |
self.assertEqual(r.status_code, 403) |
207 |
r = self.client.get(reverse('helpdesk-details', |
208 |
args=['testuser@test.com']),
|
209 |
user_token=None)
|
210 |
self.assertEqual(r.status_code, 403) |
211 |
|
212 |
# user not in helpdesk group gets 403
|
213 |
r = self.client.get(reverse('helpdesk-index')) |
214 |
self.assertEqual(r.status_code, 403) |
215 |
r = self.client.get(reverse('helpdesk-details', |
216 |
args=['testuser@test.com']))
|
217 |
self.assertEqual(r.status_code, 403) |
218 |
|
219 |
# user with helpdesk group gets 200
|
220 |
r = self.client.get(reverse('helpdesk-index'), user_token="0001") |
221 |
self.assertEqual(r.status_code, 200) |
222 |
r = self.client.get(reverse('helpdesk-details', |
223 |
args=['testuser@test.com']),
|
224 |
user_token="0001")
|
225 |
self.assertEqual(r.status_code, 200) |
226 |
|
227 |
r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,))) |
228 |
r = self.client.get(reverse('helpdesk-suspend-vm', args=(1001,)), |
229 |
user_token="0001", data={'token': '1234'}) |
230 |
self.assertEqual(r.status_code, 403) |
231 |
r = self.client.get(reverse('helpdesk-suspend-vm', args=(1001,)), |
232 |
user_token="0001")
|
233 |
self.assertEqual(r.status_code, 403) |
234 |
r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,)), |
235 |
user_token="0001", data={'token': '0001'}) |
236 |
self.assertEqual(r.status_code, 302) |
237 |
r = self.client.post(reverse('helpdesk-suspend-vm', args=(1001,)), |
238 |
user_token="0000", data={'token': '0000'}) |
239 |
self.assertEqual(r.status_code, 403) |
240 |
|
241 |
def test_suspend_vm(self): |
242 |
r = self.client.get(reverse('helpdesk-details', |
243 |
args=['testuser@test.com']),
|
244 |
user_token="0001")
|
245 |
self.assertEqual(r.status_code, 200) |
246 |
vmid = r.context['vms'][0].pk |
247 |
r = self.client.post(reverse('helpdesk-suspend-vm', args=(vmid,)), |
248 |
data={'token': '0001'}, user_token="0001") |
249 |
self.assertEqual(r.status_code, 302) |
250 |
|
251 |
r = self.client.get(reverse('helpdesk-details', |
252 |
args=['testuser@test.com']),
|
253 |
user_token="0001")
|
254 |
self.assertTrue(r.context['vms'][0].suspended) |
255 |
|
256 |
r = self.client.post(reverse('helpdesk-suspend-vm-release', |
257 |
args=(vmid,)), data={'token': '0001'}, |
258 |
user_token="0001")
|
259 |
self.assertEqual(r.status_code, 302) |
260 |
r = self.client.get(reverse('helpdesk-details', |
261 |
args=['testuser@test.com']),
|
262 |
user_token="0001")
|
263 |
self.assertFalse(r.context['vms'][0].suspended) |
264 |
|
265 |
def test_results_get_filtered(self): |
266 |
"""
|
267 |
Test that view context data are filtered based on userid provided.
|
268 |
Check helpdesk_test.json to see the existing database data.
|
269 |
"""
|
270 |
|
271 |
# 'testuser@test.com' details, see
|
272 |
# helpdes/fixtures/helpdesk_test.json for more details
|
273 |
r = self.client.get(reverse('helpdesk-details', |
274 |
args=['testuser@test.com']),
|
275 |
user_token="0001")
|
276 |
account = r.context['account']
|
277 |
vms = r.context['vms']
|
278 |
nets = r.context['networks']
|
279 |
self.assertEqual(account, USER1)
|
280 |
self.assertEqual(vms[0].name, "user1 vm") |
281 |
self.assertEqual(vms.count(), 1) |
282 |
self.assertEqual(len(nets), 2) |
283 |
self.assertEqual(r.context['account_exists'], True) |
284 |
|
285 |
# 'testuser2@test.com' details, see helpdesk
|
286 |
# /fixtures/helpdesk_test.json for more details
|
287 |
r = self.client.get(reverse('helpdesk-details', |
288 |
args=['testuser2@test.com']),
|
289 |
user_token="0001")
|
290 |
account = r.context['account']
|
291 |
vms = r.context['vms']
|
292 |
nets = r.context['networks']
|
293 |
self.assertEqual(account, USER2)
|
294 |
self.assertEqual(vms.count(), 2) |
295 |
self.assertEqual(sorted([vms[0].name, vms[1].name]), |
296 |
sorted(["user2 vm1", "user2 vm2"])) |
297 |
self.assertEqual(len(nets), 0) |
298 |
self.assertEqual(r.context['account_exists'], True) |
299 |
|
300 |
# 'testuser5@test.com' does not exist, should be redirected to
|
301 |
# helpdesk home
|
302 |
r = self.client.get(reverse('helpdesk-details', |
303 |
args=['testuser5@test.com']),
|
304 |
user_token="0001")
|
305 |
vms = r.context['vms']
|
306 |
self.assertEqual(r.context['account_exists'], False) |
307 |
self.assertEqual(vms.count(), 0) |
308 |
|
309 |
def test_start_shutdown(self): |
310 |
from synnefo.logic import backend |
311 |
|
312 |
self.vm1 = mfactory.VirtualMachineFactory(userid=USER1)
|
313 |
pk = self.vm1.pk
|
314 |
|
315 |
r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,))) |
316 |
self.assertEqual(r.status_code, 403) |
317 |
|
318 |
r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,)), |
319 |
data={'token': '0001'}) |
320 |
self.assertEqual(r.status_code, 403) |
321 |
|
322 |
backend.shutdown_instance = shutdown = mock.Mock() |
323 |
shutdown.return_value = 1
|
324 |
self.vm1.operstate = 'STARTED' |
325 |
self.vm1.save()
|
326 |
with mocked_quotaholder():
|
327 |
r = self.client.post(reverse('helpdesk-vm-shutdown', args=(pk,)), |
328 |
data={'token': '0001'}, user_token='0001') |
329 |
self.assertEqual(r.status_code, 302) |
330 |
self.assertTrue(shutdown.called)
|
331 |
self.assertEqual(len(shutdown.mock_calls), 1) |
332 |
|
333 |
backend.startup_instance = startup = mock.Mock() |
334 |
startup.return_value = 2
|
335 |
self.vm1.operstate = 'STOPPED' |
336 |
self.vm1.save()
|
337 |
with mocked_quotaholder():
|
338 |
r = self.client.post(reverse('helpdesk-vm-start', args=(pk,)), |
339 |
data={'token': '0001'}, user_token='0001') |
340 |
self.assertEqual(r.status_code, 302) |
341 |
self.assertTrue(startup.called)
|
342 |
self.assertEqual(len(startup.mock_calls), 1) |