Revision d86c3c7d kamaki/clients/test/image.py
b/kamaki/clients/test/image.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from mock import patch |
|
34 | 35 |
import time |
35 | 36 |
|
36 |
from kamaki.clients import livetest |
|
37 |
from kamaki.clients.cyclades import CycladesClient |
|
38 |
from kamaki.clients.image import ImageClient |
|
37 |
from unittest import TestCase |
|
39 | 38 |
from kamaki.clients import ClientError |
40 | 39 |
|
40 |
example_images = [ |
|
41 |
{ |
|
42 |
"status": "available", |
|
43 |
"name": "Archlinux", |
|
44 |
"disk_format": "diskdump", |
|
45 |
"container_format": "bare", |
|
46 |
"id": "b4713f20-3a41-4eaf-81ae-88698c18b3e8", |
|
47 |
"size": 752782848}, |
|
48 |
{ |
|
49 |
"status": "available", |
|
50 |
"name": "Debian_Wheezy_Base", |
|
51 |
"disk_format": "diskdump", |
|
52 |
"container_format": "bare", |
|
53 |
"id": "1f8454f0-8e3e-4b6c-ab8e-5236b728dffe", |
|
54 |
"size": 795107328}, |
|
55 |
{ |
|
56 |
"status": "available", |
|
57 |
"name": "maelstrom", |
|
58 |
"disk_format": "diskdump", |
|
59 |
"container_format": "bare", |
|
60 |
"id": "0fb03e45-7d5a-4515-bd4e-e6bbf6457f06", |
|
61 |
"size": 2583195644}, |
|
62 |
{ |
|
63 |
"status": "available", |
|
64 |
"name": "Gardenia", |
|
65 |
"disk_format": "diskdump", |
|
66 |
"container_format": "bare", |
|
67 |
"id": "5963020b-ab74-4e11-bc59-90c494bbdedb", |
|
68 |
"size": 2589802496}] |
|
69 |
|
|
70 |
class Image(TestCase): |
|
71 |
|
|
72 |
class FR(object): |
|
73 |
json = example_images |
|
74 |
headers = {} |
|
75 |
content = json |
|
76 |
status = None |
|
77 |
status_code = 200 |
|
41 | 78 |
|
42 |
class Image(livetest.Generic): |
|
43 | 79 |
def setUp(self): |
44 | 80 |
self.now = time.mktime(time.gmtime()) |
45 |
|
|
46 | 81 |
self.imgname = 'img_%s' % self.now |
47 |
url = self['image', 'url'] |
|
48 |
self.client = ImageClient(url, self['token']) |
|
49 |
cyclades_url = self['compute', 'url'] |
|
50 |
self.cyclades = CycladesClient(cyclades_url, self['token']) |
|
51 |
self._imglist = {} |
|
52 |
|
|
53 |
def test_000(self): |
|
54 |
self._prepare_img() |
|
55 |
super(self.__class__, self).test_000() |
|
56 |
|
|
57 |
def _prepare_img(self): |
|
58 |
f = open(self['image', 'local_path'], 'rb') |
|
59 |
(token, uuid) = (self['token'], self['store', 'account']) |
|
60 |
if not uuid: |
|
61 |
from kamaki.clients.astakos import AstakosClient |
|
62 |
uuid = AstakosClient(self['astakos', 'url'], token).term('uuid') |
|
63 |
from kamaki.clients.pithos import PithosClient |
|
64 |
self.pithcli = PithosClient(self['store', 'url'], token, uuid) |
|
65 |
cont = 'cont_%s' % self.now |
|
66 |
self.pithcli.container = cont |
|
67 |
self.obj = 'obj_%s' % self.now |
|
68 |
print('\t- Create container %s on Pithos server' % cont) |
|
69 |
self.pithcli.container_put() |
|
70 |
self.location = 'pithos://%s/%s/%s' % (uuid, cont, self.obj) |
|
71 |
print('\t- Upload an image at %s...\n' % self.location) |
|
72 |
self.pithcli.upload_object(self.obj, f) |
|
73 |
print('\t- ok') |
|
74 |
f.close() |
|
75 |
|
|
76 |
self.client.register( |
|
77 |
self.imgname, |
|
78 |
self.location, |
|
79 |
params=dict(is_public=True)) |
|
80 |
img = self._get_img_by_name(self.imgname) |
|
81 |
self._imglist[self.imgname] = img |
|
82 |
self.url = 'http://image.example.com' |
|
83 |
self.token = 'an1m@g370k3n==' |
|
84 |
from kamaki.clients.image import ImageClient |
|
85 |
self.client = ImageClient(self.url, self.token) |
|
86 |
self.cyclades_url = 'http://cyclades.example.com' |
|
87 |
from kamaki.clients.cyclades import CycladesClient |
|
88 |
self.cyclades = CycladesClient(self.cyclades_url, self.token) |
|
89 |
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection |
|
90 |
self.C = KamakiHTTPConnection |
|
82 | 91 |
|
83 | 92 |
def tearDown(self): |
84 |
for img in self._imglist.values(): |
|
85 |
print('\tDeleting image %s' % img['id']) |
|
86 |
self.cyclades.delete_image(img['id']) |
|
87 |
if hasattr(self, 'pithcli'): |
|
88 |
print('\tDeleting container %s' % self.pithcli.container) |
|
89 |
try: |
|
90 |
self.pithcli.del_container(delimiter='/') |
|
91 |
self.pithcli.purge_container() |
|
92 |
except ClientError: |
|
93 |
pass |
|
94 |
|
|
95 |
def _get_img_by_name(self, name): |
|
96 |
r = self.cyclades.list_images() |
|
97 |
for img in r: |
|
98 |
if img['name'] == name: |
|
99 |
return img |
|
100 |
return None |
|
93 |
self.FR.json = example_images |
|
101 | 94 |
|
102 | 95 |
def assert_dicts_are_deeply_equal(self, d1, d2): |
103 | 96 |
for k, v in d1.items(): |
... | ... | |
108 | 101 |
self.assertEqual(unicode(v), unicode(d2[k])) |
109 | 102 |
|
110 | 103 |
def test_list_public(self): |
111 |
"""Test list_public""" |
|
112 |
self._test_list_public() |
|
113 |
|
|
114 |
def _test_list_public(self): |
|
115 |
r = self.client.list_public() |
|
116 |
r0 = self.client.list_public(order='-') |
|
117 |
self.assertTrue(len(r) > 0) |
|
118 |
for img in r: |
|
119 |
for term in ( |
|
120 |
'status', |
|
121 |
'name', |
|
122 |
'container_format', |
|
123 |
'disk_format', |
|
124 |
'id', |
|
125 |
'size'): |
|
126 |
self.assertTrue(term in img) |
|
127 |
self.assertTrue(r, r0) |
|
128 |
r0.reverse() |
|
129 |
for i, img in enumerate(r): |
|
130 |
self.assert_dicts_are_deeply_equal(img, r0[i]) |
|
131 |
r1 = self.client.list_public(detail=True) |
|
132 |
for img in r1: |
|
133 |
for term in ( |
|
134 |
'status', |
|
135 |
'name', |
|
136 |
'checksum', |
|
137 |
'created_at', |
|
138 |
'disk_format', |
|
139 |
'updated_at', |
|
140 |
'id', |
|
141 |
'location', |
|
142 |
'container_format', |
|
143 |
'owner', |
|
144 |
'is_public', |
|
145 |
'deleted_at', |
|
146 |
'properties', |
|
147 |
'size'): |
|
148 |
self.assertTrue(term in img) |
|
149 |
if img['properties']: |
|
150 |
for interm in ( |
|
151 |
'osfamily', |
|
152 |
'users', |
|
153 |
'os', |
|
154 |
'root_partition', |
|
155 |
'description'): |
|
156 |
self.assertTrue(interm in img['properties']) |
|
157 |
size_max = 1000000000 |
|
158 |
r2 = self.client.list_public(filters=dict(size_max=size_max)) |
|
159 |
self.assertTrue(len(r2) <= len(r)) |
|
160 |
for img in r2: |
|
161 |
self.assertTrue(int(img['size']) <= size_max) |
|
162 |
|
|
104 |
with patch.object( |
|
105 |
self.C, |
|
106 |
'perform_request', |
|
107 |
return_value=self.FR()) as perform_req: |
|
108 |
r = self.client.list_public() |
|
109 |
self.assertEqual(self.client.http_client.url, self.url) |
|
110 |
self.assertEqual(self.client.http_client.path, '/images/') |
|
111 |
params = perform_req.call_args[0][3] |
|
112 |
self.assertEqual(params['sort_dir'], 'asc') |
|
113 |
for i in range(len(r)): |
|
114 |
self.assert_dicts_are_deeply_equal(r[i], example_images[i]) |
|
115 |
|
|
116 |
r = self.client.list_public(order='-') |
|
117 |
params = perform_req.call_args[0][3] |
|
118 |
self.assertEqual(params['sort_dir'], 'desc') |
|
119 |
self.assertEqual(self.client.http_client.url, self.url) |
|
120 |
self.assertEqual(self.client.http_client.path, '/images/') |
|
121 |
|
|
122 |
r = self.client.list_public(detail=True) |
|
123 |
self.assertEqual(self.client.http_client.url, self.url) |
|
124 |
self.assertEqual(self.client.http_client.path, '/images/detail') |
|
125 |
|
|
126 |
size_max = 1000000000 |
|
127 |
r = self.client.list_public(filters=dict(size_max=size_max)) |
|
128 |
params = perform_req.call_args[0][3] |
|
129 |
self.assertEqual(params['size_max'], size_max) |
|
130 |
self.assertEqual(self.client.http_client.url, self.url) |
|
131 |
self.assertEqual(self.client.http_client.path, '/images/') |
|
132 |
|
|
133 |
""" |
|
163 | 134 |
def test_get_meta(self): |
164 |
"""Test get_meta"""
|
|
135 |
""Test get_meta""
|
|
165 | 136 |
self._test_get_meta() |
166 | 137 |
|
167 | 138 |
def _test_get_meta(self): |
... | ... | |
192 | 163 |
self.assertTrue(interm in r['properties']) |
193 | 164 |
|
194 | 165 |
def test_register(self): |
195 |
"""Test register"""
|
|
166 |
""Test register""
|
|
196 | 167 |
self._prepare_img() |
197 | 168 |
self._test_register() |
198 | 169 |
|
... | ... | |
202 | 173 |
self.assertTrue(img is not None) |
203 | 174 |
|
204 | 175 |
def test_reregister(self): |
205 |
"""Test reregister"""
|
|
176 |
""Test reregister""
|
|
206 | 177 |
self._prepare_img() |
207 | 178 |
self._test_reregister() |
208 | 179 |
|
... | ... | |
212 | 183 |
properties=dict(my_property='some_value')) |
213 | 184 |
|
214 | 185 |
def test_set_members(self): |
215 |
"""Test set_members"""
|
|
186 |
""Test set_members""
|
|
216 | 187 |
self._prepare_img() |
217 | 188 |
self._test_set_members() |
218 | 189 |
|
... | ... | |
224 | 195 |
self.assertEqual(r[0]['member_id'], members[0]) |
225 | 196 |
|
226 | 197 |
def test_list_members(self): |
227 |
"""Test list_members"""
|
|
198 |
""Test list_members""
|
|
228 | 199 |
self._test_list_members() |
229 | 200 |
|
230 | 201 |
def _test_list_members(self): |
231 | 202 |
self._test_set_members() |
232 | 203 |
|
233 | 204 |
def test_remove_members(self): |
234 |
"""Test remove_members - NO CHECK"""
|
|
205 |
""Test remove_members - NO CHECK""
|
|
235 | 206 |
self._prepare_img() |
236 | 207 |
self._test_remove_members() |
237 | 208 |
|
... | ... | |
248 | 219 |
self.assertEqual(r0[0]['member_id'], members[1]) |
249 | 220 |
|
250 | 221 |
def test_list_shared(self): |
251 |
"""Test list_shared - NOT CHECKED"""
|
|
222 |
""Test list_shared - NOT CHECKED""
|
|
252 | 223 |
self._test_list_shared() |
253 | 224 |
|
254 | 225 |
def _test_list_shared(self): |
255 | 226 |
#No way to test this, if I dont have member images |
256 | 227 |
pass |
228 |
""" |
Also available in: Unified diff