root / pithos / api / tests.py @ 307915f1
History | View | Annotate | Download (67.9 kB)
1 |
# Copyright 2011 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
|
34 |
from django.test.client import Client |
35 |
from django.test import TestCase |
36 |
from django.utils import simplejson as json |
37 |
from xml.dom import minidom |
38 |
import types |
39 |
import hashlib |
40 |
import os |
41 |
import mimetypes |
42 |
import random |
43 |
import datetime |
44 |
import string |
45 |
from pithos.backends import backend |
46 |
|
47 |
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
|
48 |
"%A, %d-%b-%y %H:%M:%S GMT",
|
49 |
"%a, %d %b %Y %H:%M:%S GMT"]
|
50 |
|
51 |
class AaiClient(Client): |
52 |
def request(self, **request): |
53 |
request['HTTP_X_AUTH_TOKEN'] = '0000' |
54 |
return super(AaiClient, self).request(**request) |
55 |
|
56 |
class BaseTestCase(TestCase): |
57 |
#TODO unauthorized request
|
58 |
def setUp(self): |
59 |
self.client = AaiClient()
|
60 |
self.headers = {
|
61 |
'account':(
|
62 |
'X-Account-Container-Count',
|
63 |
'X-Account-Bytes-Used',
|
64 |
'Last-Modified',
|
65 |
'Content-Length',
|
66 |
'Date',
|
67 |
'Content-Type',),
|
68 |
'container':(
|
69 |
'X-Container-Object-Count',
|
70 |
'X-Container-Bytes-Used',
|
71 |
'Content-Type',
|
72 |
'Last-Modified',
|
73 |
'Content-Length',
|
74 |
'Date',
|
75 |
'X-Container-Block-Size',
|
76 |
'X-Container-Block-Hash',
|
77 |
'X-Container-Policy-Quota',
|
78 |
'X-Container-Policy-Versioning',),
|
79 |
'object':(
|
80 |
'ETag',
|
81 |
'Content-Length',
|
82 |
'Content-Type',
|
83 |
'Content-Encoding',
|
84 |
'Last-Modified',
|
85 |
'Date',
|
86 |
'X-Object-Manifest',
|
87 |
'Content-Range',
|
88 |
'X-Object-Modified-By',
|
89 |
'X-Object-Version',
|
90 |
'X-Object-Version-Timestamp',)}
|
91 |
self.contentTypes = {'xml':'application/xml', |
92 |
'json':'application/json', |
93 |
'':'text/plain'} |
94 |
self.extended = {
|
95 |
'container':(
|
96 |
'name',
|
97 |
'count',
|
98 |
'bytes',
|
99 |
'last_modified'),
|
100 |
'object':(
|
101 |
'name',
|
102 |
'hash',
|
103 |
'bytes',
|
104 |
'content_type',
|
105 |
'content_encoding',
|
106 |
'last_modified',)}
|
107 |
self.return_codes = (400, 401, 404, 503,) |
108 |
|
109 |
def assertFault(self, response, status_code, name): |
110 |
self.assertEqual(response.status_code, status_code)
|
111 |
|
112 |
def assertBadRequest(self, response): |
113 |
self.assertFault(response, 400, 'badRequest') |
114 |
|
115 |
def assertItemNotFound(self, response): |
116 |
self.assertFault(response, 404, 'itemNotFound') |
117 |
|
118 |
def assertUnauthorized(self, response): |
119 |
self.assertFault(response, 401, 'unauthorized') |
120 |
|
121 |
def assertServiceUnavailable(self, response): |
122 |
self.assertFault(response, 503, 'serviceUnavailable') |
123 |
|
124 |
def assertNonEmpty(self, response): |
125 |
self.assertFault(response, 409, 'nonEmpty') |
126 |
|
127 |
def assert_status(self, response, codes): |
128 |
l = [elem for elem in self.return_codes] |
129 |
if type(codes) == types.ListType: |
130 |
l.extend(codes) |
131 |
else:
|
132 |
l.append(codes) |
133 |
self.assertTrue(response.status_code in l) |
134 |
|
135 |
def get_account_meta(self, account, exp_meta={}): |
136 |
path = '/v1/%s' % account
|
137 |
response = self.client.head(path)
|
138 |
self.assert_status(response, 204) |
139 |
self.assert_headers(response, 'account', exp_meta) |
140 |
return response
|
141 |
|
142 |
def list_containers(self, account, limit=10000, marker='', format='', |
143 |
**headers): |
144 |
params = locals()
|
145 |
params.pop('self')
|
146 |
params.pop('account')
|
147 |
path = '/v1/%s' % account
|
148 |
response = self.client.get(path, params, **headers)
|
149 |
self.assert_status(response, [200, 204, 304, 412]) |
150 |
response.content = response.content.strip() |
151 |
if format:
|
152 |
self.assert_extended(response, format, 'container', limit) |
153 |
else:
|
154 |
names = get_content_splitted(response) |
155 |
self.assertTrue(len(names) <= limit) |
156 |
return response
|
157 |
|
158 |
def update_account_meta(self, account, **metadata): |
159 |
path = '/v1/%s' % account
|
160 |
response = self.client.post(path, **metadata)
|
161 |
response.content = response.content |
162 |
self.assert_status(response, 202) |
163 |
return response
|
164 |
|
165 |
def get_container_meta(self, account, container, exp_meta={}): |
166 |
params = locals()
|
167 |
params.pop('self')
|
168 |
params.pop('account')
|
169 |
params.pop('container')
|
170 |
path = '/v1/%s/%s' %(account, container)
|
171 |
response = self.client.head(path, params)
|
172 |
response.content = response.content |
173 |
self.assert_status(response, 204) |
174 |
if response.status_code == 204: |
175 |
self.assert_headers(response, 'container', exp_meta) |
176 |
return response
|
177 |
|
178 |
def list_objects(self, account, container, limit=10000, marker='', |
179 |
prefix='', format='', path='', delimiter='', meta='', |
180 |
**headers): |
181 |
params = locals()
|
182 |
params.pop('self')
|
183 |
params.pop('account')
|
184 |
params.pop('container')
|
185 |
path = '/v1/%s/%s' % (account, container)
|
186 |
response = self.client.get(path, params, **headers)
|
187 |
response.content = response.content.strip() |
188 |
if format:
|
189 |
self.assert_extended(response, format, 'object', limit) |
190 |
self.assert_status(response, [200, 204, 304, 412]) |
191 |
return response
|
192 |
|
193 |
def create_container(self, account, name, **meta): |
194 |
path = '/v1/%s/%s' %(account, name)
|
195 |
response = self.client.put(path, **meta)
|
196 |
response.content = response.content |
197 |
self.assert_status(response, [201, 202]) |
198 |
return response
|
199 |
|
200 |
def update_container_meta(self, account, name, **meta): |
201 |
path = '/v1/%s/%s' %(account, name)
|
202 |
response = self.client.post(path,
|
203 |
data=None,
|
204 |
content_type='text/xml',
|
205 |
follow=False, **meta)
|
206 |
response.content = response.content |
207 |
self.assert_status(response, 202) |
208 |
return response
|
209 |
|
210 |
def delete_container(self, account, container): |
211 |
path = '/v1/%s/%s' %(account, container)
|
212 |
response = self.client.delete(path)
|
213 |
response.content = response.content |
214 |
self.assert_status(response, [204, 409]) |
215 |
return response
|
216 |
|
217 |
def get_object_meta(self, account, container, name): |
218 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
219 |
response = self.client.head(path)
|
220 |
response.content = response.content |
221 |
self.assert_status(response, 200) |
222 |
return response
|
223 |
|
224 |
def get_object(self, account, container, name, format='', **headers): |
225 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
226 |
response = self.client.get(path, {'format':format}, **headers) |
227 |
response.content = response.content |
228 |
self.assert_status(response, [200, 206, 304, 412, 416]) |
229 |
if response.status_code in [200, 206]: |
230 |
self.assert_headers(response, 'object') |
231 |
return response
|
232 |
|
233 |
def upload_object(self, account, container, name, data, content_type='', |
234 |
**headers): |
235 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
236 |
response = self.client.put(path, data, content_type, **headers)
|
237 |
response.content = response.content |
238 |
self.assert_status(response, [201, 411, 422]) |
239 |
if response.status_code == 201: |
240 |
self.assertTrue(response['Etag']) |
241 |
return response
|
242 |
|
243 |
def copy_object(self, account, container, name, src, **headers): |
244 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
245 |
headers['HTTP_X_COPY_FROM'] = src
|
246 |
response = self.client.put(path, **headers)
|
247 |
response.content = response.content |
248 |
self.assert_status(response, 201) |
249 |
return response
|
250 |
|
251 |
def move_object(self, account, container, name, src, **headers): |
252 |
path = '/v1/%s/%s/%s' % (account, container, name)
|
253 |
headers['HTTP_X_MOVE_FROM'] = src
|
254 |
response = self.client.put(path, **headers)
|
255 |
response.content = response.content |
256 |
self.assert_status(response, 201) |
257 |
return response
|
258 |
|
259 |
def update_object(self, account, container, name, data='', |
260 |
content_type='', **headers):
|
261 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
262 |
response = self.client.post(path, data, content_type, **headers)
|
263 |
response.content = response.content |
264 |
self.assert_status(response, [202, 204, 416]) |
265 |
return response
|
266 |
|
267 |
def delete_object(self, account, container, name): |
268 |
path = '/v1/%s/%s/%s' %(account, container, name)
|
269 |
response = self.client.delete(path)
|
270 |
response.content = response.content |
271 |
self.assert_status(response, 204) |
272 |
return response
|
273 |
|
274 |
def assert_headers(self, response, type, exp_meta={}): |
275 |
entities = ['Account', 'Container', 'Container-Object', 'Object'] |
276 |
user_defined_meta = ['X-%s-Meta' %elem for elem in entities] |
277 |
headers = [item for item in response._headers.values()] |
278 |
t = tuple(user_defined_meta)
|
279 |
system_headers = [h for h in headers if not h[0].startswith(t)] |
280 |
for h in system_headers: |
281 |
self.assertTrue(h[0] in self.headers[type]) |
282 |
if exp_meta:
|
283 |
self.assertEqual(h[1], exp_meta[h[0]]) |
284 |
|
285 |
def assert_extended(self, response, format, type, size): |
286 |
exp_content_type = self.contentTypes[format]
|
287 |
self.assertEqual(response['Content-Type'].find(exp_content_type), 0) |
288 |
if format == 'xml': |
289 |
self.assert_xml(response, type, size) |
290 |
elif format == 'json': |
291 |
self.assert_json(response, type, size) |
292 |
|
293 |
def assert_json(self, response, type, size): |
294 |
convert = lambda s: s.lower()
|
295 |
info = [convert(elem) for elem in self.extended[type]] |
296 |
data = json.loads(response.content) |
297 |
self.assertTrue(len(data) <= size) |
298 |
for item in info: |
299 |
for i in data: |
300 |
if 'subdir' in i.keys(): |
301 |
continue
|
302 |
self.assertTrue(item in i.keys()) |
303 |
|
304 |
def assert_xml(self, response, type, size): |
305 |
convert = lambda s: s.lower()
|
306 |
info = [convert(elem) for elem in self.extended[type]] |
307 |
try:
|
308 |
info.remove('content_encoding')
|
309 |
except ValueError: |
310 |
pass
|
311 |
xml = minidom.parseString(response.content) |
312 |
for item in info: |
313 |
nodes = xml.getElementsByTagName(item) |
314 |
self.assertTrue(nodes)
|
315 |
self.assertTrue(len(nodes) <= size) |
316 |
|
317 |
|
318 |
def upload_os_file(self, account, container, fullpath, meta={}): |
319 |
try:
|
320 |
f = open(fullpath, 'r') |
321 |
data = f.read() |
322 |
name = os.path.split(fullpath)[-1]
|
323 |
return self.upload_data(account, container, name, data) |
324 |
except IOError: |
325 |
return
|
326 |
|
327 |
def upload_random_data(self, account, container, name, length=1024, |
328 |
meta={}): |
329 |
data = get_random_data(length) |
330 |
return self.upload_data(account, container, name, data, meta) |
331 |
|
332 |
def upload_data(self, account, container, name, data, meta={}): |
333 |
obj = {} |
334 |
obj['name'] = name
|
335 |
try:
|
336 |
obj['data'] = data
|
337 |
obj['hash'] = compute_md5_hash(obj['data']) |
338 |
meta.update({'HTTP_X_OBJECT_META_TEST':'test1', |
339 |
'HTTP_ETAG':obj['hash']}) |
340 |
type, enc = mimetypes.guess_type(name)
|
341 |
meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text' |
342 |
if enc:
|
343 |
meta['HTTP_CONTENT_ENCODING'] = enc
|
344 |
|
345 |
obj['meta'] = meta
|
346 |
r = self.upload_object(account,
|
347 |
container, |
348 |
obj['name'],
|
349 |
obj['data'],
|
350 |
meta['HTTP_CONTENT_TYPE'],
|
351 |
**meta) |
352 |
if r.status_code == 201: |
353 |
return obj
|
354 |
except IOError: |
355 |
return
|
356 |
|
357 |
class AccountHead(BaseTestCase): |
358 |
def setUp(self): |
359 |
BaseTestCase.setUp(self)
|
360 |
self.account = 'test' |
361 |
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] |
362 |
for item in self.containers: |
363 |
self.create_container(self.account, item) |
364 |
|
365 |
def tearDown(self): |
366 |
for c in get_content_splitted(self.list_containers(self.account)): |
367 |
self.delete_container(self.account, c) |
368 |
|
369 |
def test_get_account_meta(self): |
370 |
response = self.get_account_meta(self.account) |
371 |
r2 = self.list_containers(self.account) |
372 |
containers = get_content_splitted(r2) |
373 |
l = str(len(containers)) |
374 |
self.assertEqual(response['X-Account-Container-Count'], l) |
375 |
size = 0
|
376 |
for c in containers: |
377 |
r = self.get_container_meta(self.account, c) |
378 |
size = size + int(r['X-Container-Bytes-Used']) |
379 |
self.assertEqual(response['X-Account-Bytes-Used'], str(size)) |
380 |
|
381 |
#def test_get_account_401(self):
|
382 |
# response = self.get_account_meta('non-existing-account')
|
383 |
# print response
|
384 |
# self.assertEqual(response.status_code, 401)
|
385 |
|
386 |
class AccountGet(BaseTestCase): |
387 |
def setUp(self): |
388 |
BaseTestCase.setUp(self)
|
389 |
self.account = 'test' |
390 |
#create some containers
|
391 |
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] |
392 |
for item in self.containers: |
393 |
self.create_container(self.account, item) |
394 |
|
395 |
def tearDown(self): |
396 |
for c in get_content_splitted(self.list_containers(self.account)): |
397 |
response = self.delete_container(self.account, c) |
398 |
|
399 |
def test_list(self): |
400 |
#list containers
|
401 |
response = self.list_containers(self.account) |
402 |
containers = get_content_splitted(response) |
403 |
self.assertEquals(self.containers, containers) |
404 |
|
405 |
#def test_list_204(self):
|
406 |
# response = self.list_containers('non-existing-account')
|
407 |
# self.assertEqual(response.status_code, 204)
|
408 |
|
409 |
def test_list_with_limit(self): |
410 |
limit = 2
|
411 |
response = self.list_containers(self.account, limit=limit) |
412 |
containers = get_content_splitted(response) |
413 |
self.assertEquals(len(containers), limit) |
414 |
self.assertEquals(self.containers[:2], containers) |
415 |
|
416 |
def test_list_with_marker(self): |
417 |
l = 2
|
418 |
m = 'bananas'
|
419 |
response = self.list_containers(self.account, limit=l, marker=m) |
420 |
containers = get_content_splitted(response) |
421 |
i = self.containers.index(m) + 1 |
422 |
self.assertEquals(self.containers[i:(i+l)], containers) |
423 |
|
424 |
m = 'oranges'
|
425 |
response = self.list_containers(self.account, limit=l, marker=m) |
426 |
containers = get_content_splitted(response) |
427 |
i = self.containers.index(m) + 1 |
428 |
self.assertEquals(self.containers[i:(i+l)], containers) |
429 |
|
430 |
#def test_extended_list(self):
|
431 |
# self.list_containers(self.account, limit=3, format='xml')
|
432 |
# self.list_containers(self.account, limit=3, format='json')
|
433 |
|
434 |
def test_list_json_with_marker(self): |
435 |
l = 2
|
436 |
m = 'bananas'
|
437 |
response = self.list_containers(self.account, limit=l, marker=m, |
438 |
format='json')
|
439 |
containers = json.loads(response.content) |
440 |
self.assertEqual(containers[0]['name'], 'kiwis') |
441 |
self.assertEqual(containers[1]['name'], 'oranges') |
442 |
|
443 |
def test_list_xml_with_marker(self): |
444 |
l = 2
|
445 |
m = 'oranges'
|
446 |
response = self.list_containers(self.account, limit=l, marker=m, |
447 |
format='xml')
|
448 |
xml = minidom.parseString(response.content) |
449 |
nodes = xml.getElementsByTagName('name')
|
450 |
self.assertEqual(len(nodes), 1) |
451 |
self.assertEqual(nodes[0].childNodes[0].data, 'pears') |
452 |
|
453 |
def test_if_modified_since(self): |
454 |
t = datetime.datetime.utcnow() |
455 |
t2 = t - datetime.timedelta(minutes=10)
|
456 |
|
457 |
#add a new container
|
458 |
self.create_container(self.account, |
459 |
'dummy')
|
460 |
|
461 |
for f in DATE_FORMATS: |
462 |
past = t2.strftime(f) |
463 |
|
464 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past} |
465 |
r = self.list_containers(self.account, **headers) |
466 |
|
467 |
#assert get success
|
468 |
self.assertEqual(r.status_code, 200) |
469 |
|
470 |
def test_if_modified_since_invalid_date(self): |
471 |
headers = {'HTTP_IF_MODIFIED_SINCE':''} |
472 |
r = self.list_containers(self.account, **headers) |
473 |
|
474 |
#assert get success
|
475 |
self.assertEqual(r.status_code, 200) |
476 |
|
477 |
def test_if_not_modified_since(self): |
478 |
now = datetime.datetime.utcnow() |
479 |
since = now + datetime.timedelta(1)
|
480 |
|
481 |
for f in DATE_FORMATS: |
482 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)} |
483 |
r = self.list_containers(self.account, **headers) |
484 |
|
485 |
#assert not modified
|
486 |
self.assertEqual(r.status_code, 304) |
487 |
|
488 |
def test_if_unmodified_since(self): |
489 |
now = datetime.datetime.utcnow() |
490 |
since = now + datetime.timedelta(1)
|
491 |
|
492 |
for f in DATE_FORMATS: |
493 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)} |
494 |
r = self.list_containers(self.account, **headers) |
495 |
|
496 |
#assert success
|
497 |
self.assertEqual(r.status_code, 200) |
498 |
self.assertEqual(self.containers, get_content_splitted(r)) |
499 |
|
500 |
def test_if_unmodified_since_precondition_failed(self): |
501 |
t = datetime.datetime.utcnow() |
502 |
t2 = t - datetime.timedelta(minutes=10)
|
503 |
|
504 |
#add a new container
|
505 |
self.create_container(self.account, |
506 |
'dummy')
|
507 |
|
508 |
for f in DATE_FORMATS: |
509 |
past = t2.strftime(f) |
510 |
|
511 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past} |
512 |
r = self.list_containers(self.account, **headers) |
513 |
|
514 |
#assert get success
|
515 |
self.assertEqual(r.status_code, 412) |
516 |
|
517 |
class AccountPost(BaseTestCase): |
518 |
def setUp(self): |
519 |
BaseTestCase.setUp(self)
|
520 |
self.account = 'test' |
521 |
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] |
522 |
for item in self.containers: |
523 |
self.create_container(self.account, item) |
524 |
|
525 |
def tearDown(self): |
526 |
for c in get_content_splitted(self.list_containers(self.account)): |
527 |
self.delete_container(self.account, c) |
528 |
|
529 |
def test_update_meta(self): |
530 |
meta = {'HTTP_X_ACCOUNT_META_TEST':'test', |
531 |
'HTTP_X_ACCOUNT_META_TOST':'tost'} |
532 |
response = self.update_account_meta(self.account, **meta) |
533 |
response = self.get_account_meta(self.account) |
534 |
for k,v in meta.items(): |
535 |
key = '-'.join(elem.capitalize() for elem in k.split('_')[1:]) |
536 |
self.assertTrue(response[key])
|
537 |
self.assertEqual(response[key], v)
|
538 |
|
539 |
#def test_invalid_account_update_meta(self):
|
540 |
# with AssertInvariant(self.get_account_meta, self.account):
|
541 |
# meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
|
542 |
# 'HTTP_X_ACCOUNT_META_TOST':'tost'}
|
543 |
# response = self.update_account_meta('non-existing-account', **meta)
|
544 |
|
545 |
class ContainerHead(BaseTestCase): |
546 |
def setUp(self): |
547 |
BaseTestCase.setUp(self)
|
548 |
self.account = 'test' |
549 |
self.container = 'apples' |
550 |
self.create_container(self.account, self.container) |
551 |
|
552 |
def tearDown(self): |
553 |
for o in self.list_objects(self.account, self.container): |
554 |
self.delete_object(self.account, self.container, o) |
555 |
self.delete_container(self.account, self.container) |
556 |
|
557 |
def test_get_meta(self): |
558 |
headers = {'HTTP_X_OBJECT_META_TRASH':'true'} |
559 |
t1 = datetime.datetime.utcnow() |
560 |
o = self.upload_random_data(self.account, |
561 |
self.container,
|
562 |
'McIntosh.jpg',
|
563 |
meta=headers) |
564 |
if o:
|
565 |
r = self.get_container_meta(self.account, |
566 |
self.container)
|
567 |
self.assertEqual(r['X-Container-Object-Count'], '1') |
568 |
self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data']))) |
569 |
t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2]) |
570 |
delta = (t2 - t1) |
571 |
threashold = datetime.timedelta(seconds=1)
|
572 |
self.assertTrue(delta < threashold)
|
573 |
self.assertTrue(r['X-Container-Object-Meta']) |
574 |
self.assertTrue('Trash' in r['X-Container-Object-Meta']) |
575 |
|
576 |
class ContainerGet(BaseTestCase): |
577 |
def setUp(self): |
578 |
BaseTestCase.setUp(self)
|
579 |
self.account = 'test' |
580 |
self.container = ['pears', 'apples'] |
581 |
for c in self.container: |
582 |
self.create_container(self.account, c) |
583 |
self.obj = []
|
584 |
for o in o_names[:8]: |
585 |
self.obj.append(self.upload_random_data(self.account, |
586 |
self.container[0], |
587 |
o)) |
588 |
for o in o_names[8:]: |
589 |
self.obj.append(self.upload_random_data(self.account, |
590 |
self.container[1], |
591 |
o)) |
592 |
|
593 |
def tearDown(self): |
594 |
for c in self.container: |
595 |
for obj in get_content_splitted(self.list_objects(self.account, c)): |
596 |
self.delete_object(self.account, c, obj) |
597 |
self.delete_container(self.account, c) |
598 |
|
599 |
def test_list_objects(self): |
600 |
response = self.list_objects(self.account, self.container[0]) |
601 |
objects = get_content_splitted(response) |
602 |
l = [elem['name'] for elem in self.obj[:8]] |
603 |
l.sort() |
604 |
self.assertEqual(objects, l)
|
605 |
|
606 |
def test_list_objects_with_limit_marker(self): |
607 |
response = self.list_objects(self.account, self.container[0], limit=2) |
608 |
objects = get_content_splitted(response) |
609 |
l = [elem['name'] for elem in self.obj[:8]] |
610 |
l.sort() |
611 |
self.assertEqual(objects, l[:2]) |
612 |
|
613 |
markers = ['How To Win Friends And Influence People.pdf',
|
614 |
'moms_birthday.jpg']
|
615 |
limit = 4
|
616 |
for m in markers: |
617 |
response = self.list_objects(self.account, self.container[0], |
618 |
limit=limit, marker=m) |
619 |
objects = get_content_splitted(response) |
620 |
l = [elem['name'] for elem in self.obj[:8]] |
621 |
l.sort() |
622 |
start = l.index(m) + 1
|
623 |
end = start + limit |
624 |
end = len(l) >= end and end or len(l) |
625 |
self.assertEqual(objects, l[start:end])
|
626 |
|
627 |
def test_list_pseudo_hierarchical_folders(self): |
628 |
response = self.list_objects(self.account, self.container[1], |
629 |
prefix='photos', delimiter='/') |
630 |
objects = get_content_splitted(response) |
631 |
self.assertEquals(['photos/animals/', 'photos/me.jpg', |
632 |
'photos/plants/'], objects)
|
633 |
|
634 |
response = self.list_objects(self.account, self.container[1], |
635 |
prefix='photos/animals', delimiter='/') |
636 |
objs = get_content_splitted(response) |
637 |
l = ['photos/animals/cats/', 'photos/animals/dogs/'] |
638 |
self.assertEquals(l, objs)
|
639 |
|
640 |
response = self.list_objects(self.account, self.container[1], |
641 |
path='photos')
|
642 |
objects = get_content_splitted(response) |
643 |
self.assertEquals(['photos/me.jpg'], objects) |
644 |
|
645 |
def test_extended_list_json(self): |
646 |
response = self.list_objects(self.account, |
647 |
self.container[1], |
648 |
format='json', limit=2, |
649 |
prefix='photos/animals',
|
650 |
delimiter='/')
|
651 |
objects = json.loads(response.content) |
652 |
self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/') |
653 |
self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/') |
654 |
|
655 |
def test_extended_list_xml(self): |
656 |
response = self.list_objects(self.account, self.container[1], |
657 |
format='xml', limit=4, prefix='photos', |
658 |
delimiter='/')
|
659 |
xml = minidom.parseString(response.content) |
660 |
dirs = xml.getElementsByTagName('subdir')
|
661 |
self.assertEqual(len(dirs), 2) |
662 |
self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/') |
663 |
self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/') |
664 |
|
665 |
objects = xml.getElementsByTagName('name')
|
666 |
self.assertEqual(len(objects), 1) |
667 |
self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg') |
668 |
|
669 |
def test_list_meta_double_matching(self): |
670 |
meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa', |
671 |
'HTTP_X_OBJECT_META_STOCK':'true'} |
672 |
r = self.update_object(self.account, |
673 |
self.container[0], |
674 |
self.obj[0]['name'], |
675 |
|
676 |
**meta) |
677 |
r = self.list_objects(self.account, |
678 |
self.container[0], |
679 |
meta='Quality,Stock')
|
680 |
self.assertEqual(r.status_code, 200) |
681 |
obj = get_content_splitted(r) |
682 |
self.assertEqual(len(obj), 1) |
683 |
self.assertTrue(obj, self.obj[0]['name']) |
684 |
|
685 |
def test_list_using_meta(self): |
686 |
meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa'} |
687 |
for o in self.obj[:2]: |
688 |
r = self.update_object(self.account, |
689 |
self.container[0], |
690 |
o['name'],
|
691 |
**meta) |
692 |
meta = {'HTTP_X_OBJECT_META_STOCK':'true'} |
693 |
for o in self.obj[3:5]: |
694 |
r = self.update_object(self.account, |
695 |
self.container[0], |
696 |
o['name'],
|
697 |
**meta) |
698 |
|
699 |
r = self.list_objects(self.account, |
700 |
self.container[0], |
701 |
meta='Quality')
|
702 |
self.assertEqual(r.status_code, 200) |
703 |
obj = get_content_splitted(r) |
704 |
self.assertEqual(len(obj), 2) |
705 |
self.assertTrue(obj, [o['name'] for o in self.obj[:2]]) |
706 |
|
707 |
# test case insensitive
|
708 |
r = self.list_objects(self.account, |
709 |
self.container[0], |
710 |
meta='quality')
|
711 |
self.assertEqual(r.status_code, 200) |
712 |
obj = get_content_splitted(r) |
713 |
self.assertEqual(len(obj), 2) |
714 |
self.assertTrue(obj, [o['name'] for o in self.obj[:2]]) |
715 |
|
716 |
# test multiple matches
|
717 |
r = self.list_objects(self.account, |
718 |
self.container[0], |
719 |
meta='Quality,Stock')
|
720 |
self.assertEqual(r.status_code, 200) |
721 |
obj = get_content_splitted(r) |
722 |
self.assertEqual(len(obj), 4) |
723 |
self.assertTrue(obj, [o['name'] for o in self.obj[:4]]) |
724 |
|
725 |
# test non 1-1 multiple match
|
726 |
r = self.list_objects(self.account, |
727 |
self.container[0], |
728 |
meta='Quality,aaaa')
|
729 |
self.assertEqual(r.status_code, 200) |
730 |
obj = get_content_splitted(r) |
731 |
self.assertEqual(len(obj), 2) |
732 |
self.assertTrue(obj, [o['name'] for o in self.obj[:2]]) |
733 |
|
734 |
def test_if_modified_since(self): |
735 |
t = datetime.datetime.utcnow() |
736 |
t2 = t - datetime.timedelta(minutes=10)
|
737 |
|
738 |
#add a new container
|
739 |
self.upload_random_data(self.account, |
740 |
self.container[0], |
741 |
'dummy.txt')
|
742 |
|
743 |
for f in DATE_FORMATS: |
744 |
past = t2.strftime(f) |
745 |
|
746 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past} |
747 |
r = self.list_objects(self.account, |
748 |
self.container[0], **headers) |
749 |
|
750 |
#assert get success
|
751 |
self.assertEqual(r.status_code, 200) |
752 |
|
753 |
def test_if_modified_since_invalid_date(self): |
754 |
headers = {'HTTP_IF_MODIFIED_SINCE':''} |
755 |
r = self.list_objects(self.account, |
756 |
self.container[0], **headers) |
757 |
|
758 |
#assert get success
|
759 |
self.assertEqual(r.status_code, 200) |
760 |
|
761 |
def test_if_not_modified_since(self): |
762 |
now = datetime.datetime.utcnow() |
763 |
since = now + datetime.timedelta(1)
|
764 |
|
765 |
for f in DATE_FORMATS: |
766 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)} |
767 |
r = self.list_objects(self.account, |
768 |
self.container[0], **headers) |
769 |
|
770 |
#assert not modified
|
771 |
self.assertEqual(r.status_code, 304) |
772 |
|
773 |
def test_if_unmodified_since(self): |
774 |
now = datetime.datetime.utcnow() |
775 |
since = now + datetime.timedelta(1)
|
776 |
|
777 |
for f in DATE_FORMATS: |
778 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)} |
779 |
r = self.list_objects(self.account, |
780 |
self.container[0], **headers) |
781 |
|
782 |
#assert success
|
783 |
self.assertEqual(r.status_code, 200) |
784 |
objlist = self.list_objects(self.account, self.container[0]) |
785 |
self.assertEqual(get_content_splitted(r),
|
786 |
get_content_splitted(objlist)) |
787 |
|
788 |
def test_if_unmodified_since_precondition_failed(self): |
789 |
t = datetime.datetime.utcnow() |
790 |
t2 = t - datetime.timedelta(minutes=10)
|
791 |
|
792 |
#add a new container
|
793 |
self.create_container(self.account, |
794 |
'dummy')
|
795 |
|
796 |
for f in DATE_FORMATS: |
797 |
past = t2.strftime(f) |
798 |
|
799 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past} |
800 |
r = self.list_objects(self.account, |
801 |
self.container[0], **headers) |
802 |
|
803 |
#assert get success
|
804 |
self.assertEqual(r.status_code, 412) |
805 |
|
806 |
class ContainerPut(BaseTestCase): |
807 |
def setUp(self): |
808 |
BaseTestCase.setUp(self)
|
809 |
self.account = 'test' |
810 |
self.containers = ['c1', 'c2'] |
811 |
|
812 |
def tearDown(self): |
813 |
for c in self.containers: |
814 |
r = self.delete_container(self.account, c) |
815 |
|
816 |
def test_create(self): |
817 |
response = self.create_container(self.account, self.containers[0]) |
818 |
if response.status_code == 201: |
819 |
response = self.list_containers(self.account) |
820 |
content = get_content_splitted(response) |
821 |
self.assertTrue(self.containers[0] in content) |
822 |
r = self.get_container_meta(self.account, self.containers[0]) |
823 |
self.assertEqual(r.status_code, 204) |
824 |
|
825 |
def test_create_twice(self): |
826 |
response = self.create_container(self.account, self.containers[0]) |
827 |
if response.status_code == 201: |
828 |
r = self.create_container(self.account, self.containers[0]) |
829 |
self.assertTrue(r.status_code, 202) |
830 |
|
831 |
class ContainerPost(BaseTestCase): |
832 |
def setUp(self): |
833 |
BaseTestCase.setUp(self)
|
834 |
self.account = 'test' |
835 |
self.container = 'apples' |
836 |
self.create_container(self.account, self.container) |
837 |
|
838 |
def tearDown(self): |
839 |
for o in self.list_objects(self.account, self.container): |
840 |
self.delete_object(self.account, self.container, o) |
841 |
self.delete_container(self.account, self.container) |
842 |
|
843 |
def test_update_meta(self): |
844 |
meta = {'HTTP_X_CONTAINER_META_TEST':'test33', |
845 |
'HTTP_X_CONTAINER_META_TOST':'tost22'} |
846 |
response = self.update_container_meta(self.account, self.container, |
847 |
**meta) |
848 |
response = self.get_container_meta(self.account, self.container) |
849 |
for k,v in meta.items(): |
850 |
key = '-'.join(elem.capitalize() for elem in k.split('_')[1:]) |
851 |
self.assertTrue(response[key])
|
852 |
self.assertEqual(response[key], v)
|
853 |
|
854 |
class ContainerDelete(BaseTestCase): |
855 |
def setUp(self): |
856 |
BaseTestCase.setUp(self)
|
857 |
self.account = 'test' |
858 |
self.containers = ['c1', 'c2'] |
859 |
for c in self.containers: |
860 |
self.create_container(self.account, c) |
861 |
self.upload_random_data(self.account, |
862 |
self.containers[1], |
863 |
'nice.jpg')
|
864 |
|
865 |
def tearDown(self): |
866 |
for c in self.containers: |
867 |
for o in get_content_splitted(self.list_objects(self.account, c)): |
868 |
self.delete_object(self.account, c, o) |
869 |
self.delete_container(self.account, c) |
870 |
|
871 |
def test_delete(self): |
872 |
r = self.delete_container(self.account, self.containers[0]) |
873 |
self.assertEqual(r.status_code, 204) |
874 |
|
875 |
def test_delete_non_empty(self): |
876 |
r = self.delete_container(self.account, self.containers[1]) |
877 |
self.assertNonEmpty(r)
|
878 |
|
879 |
def test_delete_invalid(self): |
880 |
self.assertItemNotFound(self.delete_container(self.account, 'c3')) |
881 |
|
882 |
class ObjectHead(BaseTestCase): |
883 |
pass
|
884 |
|
885 |
class ObjectGet(BaseTestCase): |
886 |
def setUp(self): |
887 |
BaseTestCase.setUp(self)
|
888 |
self.account = 'test' |
889 |
self.containers = ['c1', 'c2'] |
890 |
#create some containers
|
891 |
for c in self.containers: |
892 |
self.create_container(self.account, c) |
893 |
|
894 |
#upload a file
|
895 |
self.objects = []
|
896 |
self.objects.append(self.upload_os_file(self.account, |
897 |
self.containers[1], |
898 |
'./api/tests.py'))
|
899 |
self.objects.append(self.upload_os_file(self.account, |
900 |
self.containers[1], |
901 |
'settings.py'))
|
902 |
|
903 |
def tearDown(self): |
904 |
for c in self.containers: |
905 |
for o in get_content_splitted(self.list_objects(self.account, c)): |
906 |
self.delete_object(self.account, c, o) |
907 |
self.delete_container(self.account, c) |
908 |
|
909 |
def test_get(self): |
910 |
#perform get
|
911 |
r = self.get_object(self.account, |
912 |
self.containers[1], |
913 |
self.objects[0]['name'], |
914 |
self.objects[0]['meta']) |
915 |
#assert success
|
916 |
self.assertEqual(r.status_code, 200) |
917 |
|
918 |
#assert content-type
|
919 |
self.assertEqual(r['Content-Type'], |
920 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
921 |
|
922 |
def test_get_invalid(self): |
923 |
r = self.get_object(self.account, |
924 |
self.containers[0], |
925 |
self.objects[0]['name']) |
926 |
self.assertItemNotFound(r)
|
927 |
|
928 |
def test_get_partial(self): |
929 |
#perform get with range
|
930 |
headers = {'HTTP_RANGE':'bytes=0-499'} |
931 |
r = self.get_object(self.account, |
932 |
self.containers[1], |
933 |
self.objects[0]['name'], |
934 |
**headers) |
935 |
|
936 |
#assert successful partial content
|
937 |
self.assertEqual(r.status_code, 206) |
938 |
|
939 |
#assert content-type
|
940 |
self.assertEqual(r['Content-Type'], |
941 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
942 |
|
943 |
#assert content length
|
944 |
self.assertEqual(int(r['Content-Length']), 500) |
945 |
|
946 |
#assert content
|
947 |
self.assertEqual(self.objects[0]['data'][:500], r.content) |
948 |
|
949 |
def test_get_final_500(self): |
950 |
#perform get with range
|
951 |
headers = {'HTTP_RANGE':'bytes=-500'} |
952 |
r = self.get_object(self.account, |
953 |
self.containers[1], |
954 |
self.objects[0]['name'], |
955 |
**headers) |
956 |
|
957 |
#assert successful partial content
|
958 |
self.assertEqual(r.status_code, 206) |
959 |
|
960 |
#assert content-type
|
961 |
self.assertEqual(r['Content-Type'], |
962 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
963 |
|
964 |
#assert content length
|
965 |
self.assertEqual(int(r['Content-Length']), 500) |
966 |
|
967 |
#assert content
|
968 |
self.assertTrue(self.objects[0]['data'][-500:], r.content) |
969 |
|
970 |
def test_get_rest(self): |
971 |
#perform get with range
|
972 |
offset = len(self.objects[0]['data']) - 500 |
973 |
headers = {'HTTP_RANGE':'bytes=%s-' %offset} |
974 |
r = self.get_object(self.account, |
975 |
self.containers[1], |
976 |
self.objects[0]['name'], |
977 |
**headers) |
978 |
|
979 |
#assert successful partial content
|
980 |
self.assertEqual(r.status_code, 206) |
981 |
|
982 |
#assert content-type
|
983 |
self.assertEqual(r['Content-Type'], |
984 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
985 |
|
986 |
#assert content length
|
987 |
self.assertEqual(int(r['Content-Length']), 500) |
988 |
|
989 |
#assert content
|
990 |
self.assertTrue(self.objects[0]['data'][-500:], r.content) |
991 |
|
992 |
def test_get_range_not_satisfiable(self): |
993 |
#perform get with range
|
994 |
offset = len(self.objects[0]['data']) + 1 |
995 |
headers = {'HTTP_RANGE':'bytes=0-%s' %offset} |
996 |
r = self.get_object(self.account, |
997 |
self.containers[1], |
998 |
self.objects[0]['name'], |
999 |
**headers) |
1000 |
|
1001 |
#assert Range Not Satisfiable
|
1002 |
self.assertEqual(r.status_code, 416) |
1003 |
|
1004 |
def test_multiple_range(self): |
1005 |
#perform get with multiple range
|
1006 |
ranges = ['0-499', '-500', '1000-'] |
1007 |
headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)} |
1008 |
r = self.get_object(self.account, |
1009 |
self.containers[1], |
1010 |
self.objects[0]['name'], |
1011 |
**headers) |
1012 |
|
1013 |
# assert partial content
|
1014 |
self.assertEqual(r.status_code, 206) |
1015 |
|
1016 |
# assert Content-Type of the reply will be multipart/byteranges
|
1017 |
self.assertTrue(r['Content-Type']) |
1018 |
content_type_parts = r['Content-Type'].split()
|
1019 |
self.assertEqual(content_type_parts[0], ('multipart/byteranges;')) |
1020 |
|
1021 |
boundary = '--%s' %content_type_parts[1].split('=')[-1:][0] |
1022 |
cparts = r.content.split(boundary)[1:-1] |
1023 |
|
1024 |
# assert content parts are exactly 2
|
1025 |
self.assertEqual(len(cparts), len(ranges)) |
1026 |
|
1027 |
# for each content part assert headers
|
1028 |
i = 0
|
1029 |
for cpart in cparts: |
1030 |
content = cpart.split('\r\n')
|
1031 |
headers = content[1:3] |
1032 |
content_range = headers[0].split(': ') |
1033 |
self.assertEqual(content_range[0], 'Content-Range') |
1034 |
|
1035 |
r = ranges[i].split('-')
|
1036 |
if not r[0] and not r[1]: |
1037 |
pass
|
1038 |
elif not r[0]: |
1039 |
start = len(self.objects[0]['data']) - int(r[1]) |
1040 |
end = len(self.objects[0]['data']) |
1041 |
elif not r[1]: |
1042 |
start = int(r[0]) |
1043 |
end = len(self.objects[0]['data']) |
1044 |
else:
|
1045 |
start = int(r[0]) |
1046 |
end = int(r[1]) + 1 |
1047 |
fdata = self.objects[0]['data'][start:end] |
1048 |
sdata = '\r\n'.join(content[4:-1]) |
1049 |
self.assertEqual(len(fdata), len(sdata)) |
1050 |
self.assertEquals(fdata, sdata)
|
1051 |
i+=1
|
1052 |
|
1053 |
def test_multiple_range_not_satisfiable(self): |
1054 |
#perform get with multiple range
|
1055 |
out_of_range = len(self.objects[0]['data']) + 1 |
1056 |
ranges = ['0-499', '-500', '%d-' %out_of_range] |
1057 |
headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)} |
1058 |
r = self.get_object(self.account, |
1059 |
self.containers[1], |
1060 |
self.objects[0]['name'], |
1061 |
**headers) |
1062 |
|
1063 |
# assert partial content
|
1064 |
self.assertEqual(r.status_code, 416) |
1065 |
|
1066 |
def test_get_with_if_match(self): |
1067 |
#perform get with If-Match
|
1068 |
headers = {'HTTP_IF_MATCH':self.objects[0]['hash']} |
1069 |
r = self.get_object(self.account, |
1070 |
self.containers[1], |
1071 |
self.objects[0]['name'], |
1072 |
**headers) |
1073 |
#assert get success
|
1074 |
self.assertEqual(r.status_code, 200) |
1075 |
|
1076 |
#assert content-type
|
1077 |
self.assertEqual(r['Content-Type'], |
1078 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1079 |
|
1080 |
#assert response content
|
1081 |
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip()) |
1082 |
|
1083 |
def test_get_with_if_match_star(self): |
1084 |
#perform get with If-Match *
|
1085 |
headers = {'HTTP_IF_MATCH':'*'} |
1086 |
r = self.get_object(self.account, |
1087 |
self.containers[1], |
1088 |
self.objects[0]['name'], |
1089 |
**headers) |
1090 |
#assert get success
|
1091 |
self.assertEqual(r.status_code, 200) |
1092 |
|
1093 |
#assert content-type
|
1094 |
self.assertEqual(r['Content-Type'], |
1095 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1096 |
|
1097 |
#assert response content
|
1098 |
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip()) |
1099 |
|
1100 |
def test_get_with_multiple_if_match(self): |
1101 |
#perform get with If-Match
|
1102 |
etags = [i['hash'] for i in self.objects if i] |
1103 |
etags = ','.join('"%s"' % etag for etag in etags) |
1104 |
headers = {'HTTP_IF_MATCH':etags}
|
1105 |
r = self.get_object(self.account, |
1106 |
self.containers[1], |
1107 |
self.objects[0]['name'], |
1108 |
**headers) |
1109 |
#assert get success
|
1110 |
self.assertEqual(r.status_code, 200) |
1111 |
|
1112 |
#assert content-type
|
1113 |
self.assertEqual(r['Content-Type'], |
1114 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1115 |
|
1116 |
#assert content-type
|
1117 |
self.assertEqual(r['Content-Type'], |
1118 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1119 |
|
1120 |
#assert response content
|
1121 |
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip()) |
1122 |
|
1123 |
def test_if_match_precondition_failed(self): |
1124 |
#perform get with If-Match
|
1125 |
headers = {'HTTP_IF_MATCH':'123'} |
1126 |
r = self.get_object(self.account, |
1127 |
self.containers[1], |
1128 |
self.objects[0]['name'], |
1129 |
**headers) |
1130 |
#assert precondition failed
|
1131 |
self.assertEqual(r.status_code, 412) |
1132 |
|
1133 |
def test_if_none_match(self): |
1134 |
#perform get with If-None-Match
|
1135 |
headers = {'HTTP_IF_NONE_MATCH':'123'} |
1136 |
r = self.get_object(self.account, |
1137 |
self.containers[1], |
1138 |
self.objects[0]['name'], |
1139 |
**headers) |
1140 |
|
1141 |
#assert get success
|
1142 |
self.assertEqual(r.status_code, 200) |
1143 |
|
1144 |
#assert content-type
|
1145 |
self.assertEqual(r['Content-Type'], |
1146 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1147 |
|
1148 |
def test_if_none_match(self): |
1149 |
#perform get with If-None-Match *
|
1150 |
headers = {'HTTP_IF_NONE_MATCH':'*'} |
1151 |
r = self.get_object(self.account, |
1152 |
self.containers[1], |
1153 |
self.objects[0]['name'], |
1154 |
**headers) |
1155 |
|
1156 |
#assert get success
|
1157 |
self.assertEqual(r.status_code, 304) |
1158 |
|
1159 |
def test_if_none_match_not_modified(self): |
1160 |
#perform get with If-None-Match
|
1161 |
headers = {'HTTP_IF_NONE_MATCH':'%s' %self.objects[0]['hash']} |
1162 |
r = self.get_object(self.account, |
1163 |
self.containers[1], |
1164 |
self.objects[0]['name'], |
1165 |
**headers) |
1166 |
|
1167 |
#assert not modified
|
1168 |
self.assertEqual(r.status_code, 304) |
1169 |
self.assertEqual(r['ETag'], self.objects[0]['hash']) |
1170 |
|
1171 |
def test_if_modified_since(self): |
1172 |
t = datetime.datetime.utcnow() |
1173 |
t2 = t - datetime.timedelta(minutes=10)
|
1174 |
|
1175 |
#modify the object
|
1176 |
self.upload_object(self.account, |
1177 |
self.containers[1], |
1178 |
self.objects[0]['name'], |
1179 |
self.objects[0]['data'][:200]) |
1180 |
|
1181 |
for f in DATE_FORMATS: |
1182 |
past = t2.strftime(f) |
1183 |
|
1184 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past} |
1185 |
r = self.get_object(self.account, |
1186 |
self.containers[1], |
1187 |
self.objects[0]['name'], |
1188 |
**headers) |
1189 |
|
1190 |
#assert get success
|
1191 |
self.assertEqual(r.status_code, 200) |
1192 |
|
1193 |
#assert content-type
|
1194 |
self.assertEqual(r['Content-Type'], |
1195 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1196 |
|
1197 |
def test_if_modified_since_invalid_date(self): |
1198 |
headers = {'HTTP_IF_MODIFIED_SINCE':''} |
1199 |
r = self.get_object(self.account, |
1200 |
self.containers[1], |
1201 |
self.objects[0]['name'], |
1202 |
**headers) |
1203 |
|
1204 |
#assert get success
|
1205 |
self.assertEqual(r.status_code, 200) |
1206 |
|
1207 |
#assert content-type
|
1208 |
self.assertEqual(r['Content-Type'], |
1209 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1210 |
|
1211 |
def test_if_not_modified_since(self): |
1212 |
now = datetime.datetime.utcnow() |
1213 |
since = now + datetime.timedelta(1)
|
1214 |
|
1215 |
for f in DATE_FORMATS: |
1216 |
headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)} |
1217 |
r = self.get_object(self.account, |
1218 |
self.containers[1], |
1219 |
self.objects[0]['name'], |
1220 |
**headers) |
1221 |
|
1222 |
#assert not modified
|
1223 |
self.assertEqual(r.status_code, 304) |
1224 |
|
1225 |
def test_if_unmodified_since(self): |
1226 |
now = datetime.datetime.utcnow() |
1227 |
since = now + datetime.timedelta(1)
|
1228 |
|
1229 |
for f in DATE_FORMATS: |
1230 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)} |
1231 |
r = self.get_object(self.account, |
1232 |
self.containers[1], |
1233 |
self.objects[0]['name'], |
1234 |
**headers) |
1235 |
#assert success
|
1236 |
self.assertEqual(r.status_code, 200) |
1237 |
self.assertEqual(self.objects[0]['data'], r.content) |
1238 |
|
1239 |
#assert content-type
|
1240 |
self.assertEqual(r['Content-Type'], |
1241 |
self.objects[0]['meta']['HTTP_CONTENT_TYPE']) |
1242 |
|
1243 |
def test_if_unmodified_since_precondition_failed(self): |
1244 |
t = datetime.datetime.utcnow() |
1245 |
t2 = t - datetime.timedelta(minutes=10)
|
1246 |
|
1247 |
#modify the object
|
1248 |
self.upload_object(self.account, |
1249 |
self.containers[1], |
1250 |
self.objects[0]['name'], |
1251 |
self.objects[0]['data'][:200]) |
1252 |
|
1253 |
for f in DATE_FORMATS: |
1254 |
past = t2.strftime(f) |
1255 |
|
1256 |
headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past} |
1257 |
r = self.get_object(self.account, |
1258 |
self.containers[1], |
1259 |
self.objects[0]['name'], |
1260 |
**headers) |
1261 |
#assert get success
|
1262 |
self.assertEqual(r.status_code, 412) |
1263 |
|
1264 |
def test_hashes(self): |
1265 |
l = 8388609
|
1266 |
fname = 'largefile'
|
1267 |
o = self.upload_random_data(self.account, |
1268 |
self.containers[1], |
1269 |
fname, |
1270 |
l) |
1271 |
if o:
|
1272 |
r = self.get_object(self.account, |
1273 |
self.containers[1], |
1274 |
fname, |
1275 |
'json')
|
1276 |
body = json.loads(r.content) |
1277 |
hashes = body['hashes']
|
1278 |
block_size = body['block_size']
|
1279 |
block_hash = body['block_hash']
|
1280 |
block_num = l/block_size == 0 and l/block_size or l/block_size + 1 |
1281 |
self.assertTrue(len(hashes), block_num) |
1282 |
i = 0
|
1283 |
for h in hashes: |
1284 |
start = i * block_size |
1285 |
end = (i + 1) * block_size
|
1286 |
hash = compute_block_hash(o['data'][start:end], block_hash)
|
1287 |
self.assertEqual(h, hash) |
1288 |
i += 1
|
1289 |
|
1290 |
class ObjectPut(BaseTestCase): |
1291 |
def setUp(self): |
1292 |
BaseTestCase.setUp(self)
|
1293 |
self.account = 'test' |
1294 |
self.container = 'c1' |
1295 |
self.create_container(self.account, self.container) |
1296 |
|
1297 |
self.src = os.path.join('.', 'api', 'tests.py') |
1298 |
self.dest = os.path.join('.', 'api', 'chunked_update_test_file') |
1299 |
create_chunked_update_test_file(self.src, self.dest) |
1300 |
|
1301 |
def tearDown(self): |
1302 |
r = self.list_objects(self.account, self.container) |
1303 |
for o in get_content_splitted(r): |
1304 |
self.delete_object(self.account, self.container, o) |
1305 |
self.delete_container(self.account, self.container) |
1306 |
|
1307 |
# delete test file
|
1308 |
os.remove(self.dest)
|
1309 |
|
1310 |
def test_upload(self): |
1311 |
filename = 'tests.py'
|
1312 |
fullpath = os.path.join('.', 'api', filename) |
1313 |
f = open(fullpath, 'r') |
1314 |
data = f.read() |
1315 |
hash = compute_md5_hash(data) |
1316 |
meta={'HTTP_ETAG':hash, |
1317 |
'HTTP_X_OBJECT_META_TEST':'test1' |
1318 |
} |
1319 |
type, enc = mimetypes.guess_type(fullpath)
|
1320 |
meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text' |
1321 |
if enc:
|
1322 |
meta['HTTP_CONTENT_ENCODING'] = enc
|
1323 |
r = self.upload_object(self.account, |
1324 |
self.container,
|
1325 |
filename, |
1326 |
data, |
1327 |
content_type=meta['HTTP_CONTENT_TYPE'],
|
1328 |
**meta) |
1329 |
self.assertEqual(r.status_code, 201) |
1330 |
r = self.get_object_meta(self.account, self.container, filename) |
1331 |
self.assertTrue(r['X-Object-Meta-Test']) |
1332 |
self.assertEqual(r['X-Object-Meta-Test'], |
1333 |
meta['HTTP_X_OBJECT_META_TEST'])
|
1334 |
|
1335 |
#assert uploaded content
|
1336 |
r = self.get_object(self.account, self.container, filename) |
1337 |
self.assertEqual(os.path.getsize(fullpath), int(r['Content-Length'])) |
1338 |
self.assertEqual(data.strip(), r.content.strip())
|
1339 |
|
1340 |
def test_upload_unprocessable_entity(self): |
1341 |
filename = 'tests.py'
|
1342 |
fullpath = os.path.join('.', 'api', filename) |
1343 |
f = open(fullpath, 'r') |
1344 |
data = f.read() |
1345 |
meta={'HTTP_ETAG':'123', |
1346 |
'HTTP_X_OBJECT_META_TEST':'test1' |
1347 |
} |
1348 |
type, enc = mimetypes.guess_type(fullpath)
|
1349 |
meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text' |
1350 |
if enc:
|
1351 |
meta['HTTP_CONTENT_ENCODING'] = enc
|
1352 |
r = self.upload_object(self.account, |
1353 |
self.container,
|
1354 |
filename, |
1355 |
data, |
1356 |
content_type = meta['HTTP_CONTENT_TYPE'],
|
1357 |
**meta) |
1358 |
self.assertEqual(r.status_code, 422) |
1359 |
|
1360 |
def test_chucked_update(self): |
1361 |
objname = os.path.split(self.src)[-1:][0] |
1362 |
f = open(self.dest, 'r') |
1363 |
data = f.read() |
1364 |
meta = {} |
1365 |
type, enc = mimetypes.guess_type(self.dest) |
1366 |
meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text' |
1367 |
if enc:
|
1368 |
meta['HTTP_CONTENT_ENCODING'] = enc
|
1369 |
meta.update({'HTTP_TRANSFER_ENCODING':'chunked'}) |
1370 |
r = self.upload_object(self.account, |
1371 |
self.container,
|
1372 |
objname, |
1373 |
data, |
1374 |
content_type = 'plain/text',
|
1375 |
**meta) |
1376 |
self.assertEqual(r.status_code, 201) |
1377 |
|
1378 |
r = self.get_object(self.account, |
1379 |
self.container,
|
1380 |
objname) |
1381 |
uploaded_data = r.content |
1382 |
f = open(self.src, 'r') |
1383 |
actual_data = f.read() |
1384 |
self.assertEqual(actual_data, uploaded_data)
|
1385 |
|
1386 |
class ObjectCopy(BaseTestCase): |
1387 |
def setUp(self): |
1388 |
BaseTestCase.setUp(self)
|
1389 |
self.account = 'test' |
1390 |
self.containers = ['c1', 'c2'] |
1391 |
for c in self.containers: |
1392 |
self.create_container(self.account, c) |
1393 |
self.obj = self.upload_os_file(self.account, |
1394 |
self.containers[0], |
1395 |
'./api/tests.py')
|
1396 |
|
1397 |
def tearDown(self): |
1398 |
for c in self.containers: |
1399 |
for o in get_content_splitted(self.list_objects(self.account, c)): |
1400 |
self.delete_object(self.account, c, o) |
1401 |
self.delete_container(self.account, c) |
1402 |
|
1403 |
def test_copy(self): |
1404 |
with AssertInvariant(self.get_object_meta, self.account, |
1405 |
self.containers[0], self.obj['name']): |
1406 |
#perform copy
|
1407 |
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'} |
1408 |
src_path = os.path.join('/', self.containers[0], self.obj['name']) |
1409 |
r = self.copy_object(self.account, |
1410 |
self.containers[0], |
1411 |
'testcopy',
|
1412 |
src_path, |
1413 |
**meta) |
1414 |
#assert copy success
|
1415 |
self.assertEqual(r.status_code, 201) |
1416 |
|
1417 |
#assert access the new object
|
1418 |
r = self.get_object_meta(self.account, |
1419 |
self.containers[0], |
1420 |
'testcopy')
|
1421 |
self.assertTrue(r['X-Object-Meta-Test']) |
1422 |
self.assertTrue(r['X-Object-Meta-Test'], 'testcopy') |
1423 |
|
1424 |
#assert etag is the same
|
1425 |
self.assertEqual(r['ETag'], self.obj['hash']) |
1426 |
|
1427 |
#assert src object still exists
|
1428 |
r = self.get_object_meta(self.account, self.containers[0], |
1429 |
self.obj['name']) |
1430 |
self.assertEqual(r.status_code, 200) |
1431 |
|
1432 |
def test_copy_from_different_container(self): |
1433 |
with AssertInvariant(self.get_object_meta, |
1434 |
self.account,
|
1435 |
self.containers[0], |
1436 |
self.obj['name']): |
1437 |
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'} |
1438 |
src_path = os.path.join('/', self.containers[0], self.obj['name']) |
1439 |
r = self.copy_object(self.account, |
1440 |
self.containers[1], |
1441 |
'testcopy',
|
1442 |
src_path, |
1443 |
**meta) |
1444 |
self.assertEqual(r.status_code, 201) |
1445 |
|
1446 |
# assert updated metadata
|
1447 |
r = self.get_object_meta(self.account, |
1448 |
self.containers[1], |
1449 |
'testcopy')
|
1450 |
self.assertTrue(r['X-Object-Meta-Test']) |
1451 |
self.assertTrue(r['X-Object-Meta-Test'], 'testcopy') |
1452 |
|
1453 |
#assert src object still exists
|
1454 |
r = self.get_object_meta(self.account, self.containers[0], |
1455 |
self.obj['name']) |
1456 |
self.assertEqual(r.status_code, 200) |
1457 |
|
1458 |
def test_copy_invalid(self): |
1459 |
#copy from invalid object
|
1460 |
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'} |
1461 |
r = self.copy_object(self.account, |
1462 |
self.containers[1], |
1463 |
'testcopy',
|
1464 |
os.path.join('/', self.containers[0], 'test.py'), |
1465 |
**meta) |
1466 |
self.assertItemNotFound(r)
|
1467 |
|
1468 |
#copy from invalid container
|
1469 |
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'} |
1470 |
src_path = os.path.join('/', self.containers[1], self.obj['name']) |
1471 |
r = self.copy_object(self.account, |
1472 |
self.containers[1], |
1473 |
'testcopy',
|
1474 |
src_path, |
1475 |
**meta) |
1476 |
self.assertItemNotFound(r)
|
1477 |
|
1478 |
class ObjectMove(ObjectCopy): |
1479 |
def test_move(self): |
1480 |
#perform move
|
1481 |
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'} |
1482 |
src_path = os.path.join('/', self.containers[0], self.obj['name']) |
1483 |
r = self.move_object(self.account, |
1484 |
self.containers[0], |
1485 |
'testcopy',
|
1486 |
src_path, |
1487 |
**meta) |
1488 |
#assert successful move
|
1489 |
self.assertEqual(r.status_code, 201) |
1490 |
|
1491 |
#assert updated metadata
|
1492 |
r = self.get_object_meta(self.account, |
1493 |
self.containers[0], |
1494 |
'testcopy')
|
1495 |
self.assertTrue(r['X-Object-Meta-Test']) |
1496 |
self.assertTrue(r['X-Object-Meta-Test'], 'testcopy') |
1497 |
|
1498 |
#assert src object no more exists
|
1499 |
r = self.get_object_meta(self.account, self.containers[0], |
1500 |
self.obj['name']) |
1501 |
self.assertItemNotFound(r)
|
1502 |
|
1503 |
class ObjectPost(BaseTestCase): |
1504 |
def setUp(self): |
1505 |
BaseTestCase.setUp(self)
|
1506 |
self.account = 'test' |
1507 |
self.containers = ['c1', 'c2'] |
1508 |
for c in self.containers: |
1509 |
self.create_container(self.account, c) |
1510 |
self.obj = self.upload_os_file(self.account, |
1511 |
self.containers[0], |
1512 |
'./api/tests.py')
|
1513 |
|
1514 |
def tearDown(self): |
1515 |
for c in self.containers: |
1516 |
for o in get_content_splitted(self.list_objects(self.account, c)): |
1517 |
self.delete_object(self.account, c, o) |
1518 |
self.delete_container(self.account, c) |
1519 |
|
1520 |
def test_update_meta(self): |
1521 |
#perform update metadata
|
1522 |
more = {'HTTP_X_OBJECT_META_FOO':'foo', |
1523 |
'HTTP_X_OBJECT_META_BAR':'bar'} |
1524 |
r = self.update_object(self.account, |
1525 |
self.containers[0], |
1526 |
self.obj['name'], |
1527 |
**more) |
1528 |
#assert request accepted
|
1529 |
self.assertEqual(r.status_code, 202) |
1530 |
|
1531 |
#assert old metadata are still there
|
1532 |
r = self.get_object_meta(self.account, self.containers[0], |
1533 |
self.obj['name']) |
1534 |
self.assertTrue('X-Object-Meta-Test' not in r.items()) |
1535 |
|
1536 |
#assert new metadata have been updated
|
1537 |
for k,v in more.items(): |
1538 |
key = '-'.join(elem.capitalize() for elem in k.split('_')[1:]) |
1539 |
self.assertTrue(r[key])
|
1540 |
self.assertTrue(r[key], v)
|
1541 |
|
1542 |
def test_update_object(self, |
1543 |
first_byte_pos=0,
|
1544 |
last_byte_pos=499,
|
1545 |
instance_length = True,
|
1546 |
content_length = 500):
|
1547 |
l = len(self.obj['data']) |
1548 |
length = instance_length and l or '*' |
1549 |
range = 'bytes %d-%d/%s' %(first_byte_pos,
|
1550 |
last_byte_pos, |
1551 |
length) |
1552 |
partial = last_byte_pos - first_byte_pos + 1
|
1553 |
data = get_random_data(partial) |
1554 |
more = {'HTTP_CONTENT_RANGE':range} |
1555 |
if content_length:
|
1556 |
more.update({'CONTENT_LENGTH':'%s' % content_length}) |
1557 |
|
1558 |
r = self.update_object(self.account, |
1559 |
self.containers[0], |
1560 |
self.obj['name'], |
1561 |
data, |
1562 |
'application/octet-stream',
|
1563 |
**more) |
1564 |
|
1565 |
if partial < 0 or (instance_length and l <= last_byte_pos): |
1566 |
self.assertEqual(r.status_code, 416) |
1567 |
elif content_length and content_length != partial: |
1568 |
self.assertEqual(r.status_code, 400) |
1569 |
else:
|
1570 |
self.assertEqual(r.status_code, 204) |
1571 |
|
1572 |
#check modified object
|
1573 |
r = self.get_object(self.account, |
1574 |
self.containers[0], |
1575 |
self.obj['name']) |
1576 |
self.assertEqual(r.content[0:partial], data) |
1577 |
self.assertEqual(r.content[partial:l], self.obj['data'][partial:l]) |
1578 |
|
1579 |
def test_update_object_no_content_length(self): |
1580 |
self.test_update_object(content_length = None) |
1581 |
|
1582 |
def test_update_object_invalid_content_length(self): |
1583 |
with AssertContentInvariant(self.get_object, self.account, self.containers[0], |
1584 |
self.obj['name']): |
1585 |
self.test_update_object(content_length = 1000) |
1586 |
|
1587 |
def test_update_object_with_unknown_instance_length(self): |
1588 |
self.test_update_object(instance_length = False) |
1589 |
|
1590 |
def test_update_object_invalid_range(self): |
1591 |
with AssertContentInvariant(self.get_object, self.account, self.containers[0], |
1592 |
self.obj['name']): |
1593 |
self.test_update_object(499, 0, True) |
1594 |
|
1595 |
def test_update_object_invalid_range_and_length(self): |
1596 |
with AssertContentInvariant(self.get_object, self.account, self.containers[0], |
1597 |
self.obj['name']): |
1598 |
self.test_update_object(499, 0, True, -1) |
1599 |
|
1600 |
def test_update_object_invalid_range_with_no_content_length(self): |
1601 |
with AssertContentInvariant(self.get_object, self.account, self.containers[0], |
1602 |
self.obj['name']): |
1603 |
self.test_update_object(499, 0, True, content_length = None) |
1604 |
|
1605 |
def test_update_object_out_of_limits(self): |
1606 |
with AssertContentInvariant(self.get_object, self.account, self.containers[0], |
1607 |
self.obj['name']): |
1608 |
l = len(self.obj['data']) |
1609 |
self.test_update_object(0, l+1, True) |
1610 |
|
1611 |
def test_append(self): |
1612 |
data = get_random_data(500)
|
1613 |
more = {'CONTENT_LENGTH':'500', |
1614 |
'HTTP_CONTENT_RANGE':'bytes */*'} |
1615 |
|
1616 |
r = self.update_object(self.account, |
1617 |
self.containers[0], |
1618 |
self.obj['name'], |
1619 |
data, |
1620 |
'application/octet-stream',
|
1621 |
**more) |
1622 |
|
1623 |
self.assertEqual(r.status_code, 204) |
1624 |
|
1625 |
r = self.get_object(self.account, |
1626 |
self.containers[0], |
1627 |
self.obj['name']) |
1628 |
self.assertEqual(len(r.content), len(self.obj['data']) + 500) |
1629 |
self.assertEqual(r.content[:-500], self.obj['data']) |
1630 |
|
1631 |
def test_update_with_chunked_transfer(self): |
1632 |
data, pure = create_random_chunked_data() |
1633 |
dl = len(pure)
|
1634 |
fl = len(self.obj['data']) |
1635 |
meta = {'HTTP_TRANSFER_ENCODING':'chunked', |
1636 |
'HTTP_CONTENT_RANGE':'bytes 0-/%d' %fl} |
1637 |
r = self.update_object(self.account, |
1638 |
self.containers[0], |
1639 |
self.obj['name'], |
1640 |
data, |
1641 |
'application/octet-stream',
|
1642 |
**meta) |
1643 |
|
1644 |
#check modified object
|
1645 |
r = self.get_object(self.account, |
1646 |
self.containers[0], |
1647 |
self.obj['name']) |
1648 |
self.assertEqual(r.content[0:dl], pure) |
1649 |
self.assertEqual(r.content[dl:fl], self.obj['data'][dl:fl]) |
1650 |
|
1651 |
def test_update_with_chunked_transfer_strict_range(self): |
1652 |
data, pure = create_random_chunked_data() |
1653 |
dl = len(pure) - 1 |
1654 |
fl = len(self.obj['data']) |
1655 |
meta = {'HTTP_TRANSFER_ENCODING':'chunked', |
1656 |
'HTTP_CONTENT_RANGE':'bytes 0-%d/%d' %(dl, fl)} |
1657 |
r = self.update_object(self.account, |
1658 |
self.containers[0], |
1659 |
self.obj['name'], |
1660 |
data, |
1661 |
'application/octet-stream',
|
1662 |
**meta) |
1663 |
|
1664 |
#check modified object
|
1665 |
r = self.get_object(self.account, |
1666 |
self.containers[0], |
1667 |
self.obj['name']) |
1668 |
self.assertEqual(r.content[0:dl+1], pure) |
1669 |
self.assertEqual(r.content[dl+1:fl], self.obj['data'][dl+1:fl]) |
1670 |
|
1671 |
class ObjectDelete(BaseTestCase): |
1672 |
def setUp(self): |
1673 |
BaseTestCase.setUp(self)
|
1674 |
self.account = 'test' |
1675 |
self.containers = ['c1', 'c2'] |
1676 |
for c in self.containers: |
1677 |
self.create_container(self.account, c) |
1678 |
self.obj = self.upload_os_file(self.account, |
1679 |
self.containers[0], |
1680 |
'./api/tests.py')
|
1681 |
|
1682 |
def tearDown(self): |
1683 |
for c in self.containers: |
1684 |
for o in get_content_splitted(self.list_objects(self.account, c)): |
1685 |
self.delete_object(self.account, c, o) |
1686 |
self.delete_container(self.account, c) |
1687 |
|
1688 |
def test_delete(self): |
1689 |
#perform delete object
|
1690 |
r = self.delete_object(self.account, self.containers[0], |
1691 |
self.obj['name']) |
1692 |
|
1693 |
#assert success
|
1694 |
self.assertEqual(r.status_code, 204) |
1695 |
|
1696 |
def test_delete_invalid(self): |
1697 |
#perform delete object
|
1698 |
r = self.delete_object(self.account, self.containers[1], |
1699 |
self.obj['name']) |
1700 |
|
1701 |
#assert failure
|
1702 |
self.assertItemNotFound(r)
|
1703 |
|
1704 |
class AssertInvariant(object): |
1705 |
def __init__(self, callable, *args, **kwargs): |
1706 |
self.callable = callable |
1707 |
self.args = args
|
1708 |
self.kwargs = kwargs
|
1709 |
|
1710 |
def __enter__(self): |
1711 |
self.items = self.callable(*self.args, **self.kwargs).items() |
1712 |
return self.items |
1713 |
|
1714 |
def __exit__(self, type, value, tb): |
1715 |
items = self.callable(*self.args, **self.kwargs).items() |
1716 |
assert self.items == items |
1717 |
|
1718 |
class AssertContentInvariant(object): |
1719 |
def __init__(self, callable, *args, **kwargs): |
1720 |
self.callable = callable |
1721 |
self.args = args
|
1722 |
self.kwargs = kwargs
|
1723 |
|
1724 |
def __enter__(self): |
1725 |
self.content = self.callable(*self.args, **self.kwargs).content |
1726 |
return self.content |
1727 |
|
1728 |
def __exit__(self, type, value, tb): |
1729 |
content = self.callable(*self.args, **self.kwargs).content |
1730 |
assert self.content == content |
1731 |
|
1732 |
def get_content_splitted(response): |
1733 |
if response:
|
1734 |
return response.content.split('\n') |
1735 |
|
1736 |
def compute_md5_hash(data): |
1737 |
md5 = hashlib.md5() |
1738 |
offset = 0
|
1739 |
md5.update(data) |
1740 |
return md5.hexdigest().lower()
|
1741 |
|
1742 |
def compute_block_hash(data, algorithm): |
1743 |
h = hashlib.new(algorithm) |
1744 |
h.update(data.rstrip('\x00'))
|
1745 |
return h.hexdigest()
|
1746 |
|
1747 |
def create_chunked_update_test_file(src, dest): |
1748 |
fr = open(src, 'r') |
1749 |
fw = open(dest, 'w') |
1750 |
data = fr.readline() |
1751 |
while data:
|
1752 |
fw.write(hex(len(data))) |
1753 |
fw.write('\r\n')
|
1754 |
fw.write(data) |
1755 |
data = fr.readline() |
1756 |
fw.write(hex(0)) |
1757 |
fw.write('\r\n')
|
1758 |
|
1759 |
def create_random_chunked_data(rows=5): |
1760 |
i = 0
|
1761 |
out = [] |
1762 |
pure= [] |
1763 |
while i < rows:
|
1764 |
data = get_random_data(random.randint(1, 100)) |
1765 |
out.append(hex(len(data))) |
1766 |
out.append(data) |
1767 |
pure.append(data) |
1768 |
i+=1
|
1769 |
out.append(hex(0)) |
1770 |
out.append('\r\n')
|
1771 |
return '\r\n'.join(out), ''.join(pure) |
1772 |
|
1773 |
def get_random_data(length=500): |
1774 |
char_set = string.ascii_uppercase + string.digits |
1775 |
return ''.join(random.choice(char_set) for x in range(length)) |
1776 |
|
1777 |
o_names = ['kate.jpg',
|
1778 |
'kate_beckinsale.jpg',
|
1779 |
'How To Win Friends And Influence People.pdf',
|
1780 |
'moms_birthday.jpg',
|
1781 |
'poodle_strut.mov',
|
1782 |
'Disturbed - Down With The Sickness.mp3',
|
1783 |
'army_of_darkness.avi',
|
1784 |
'the_mad.avi',
|
1785 |
'photos/animals/dogs/poodle.jpg',
|
1786 |
'photos/animals/dogs/terrier.jpg',
|
1787 |
'photos/animals/cats/persian.jpg',
|
1788 |
'photos/animals/cats/siamese.jpg',
|
1789 |
'photos/plants/fern.jpg',
|
1790 |
'photos/plants/rose.jpg',
|
1791 |
'photos/me.jpg']
|