1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from django.test.client import Client
35 from django.test import TestCase
36 from django.utils import simplejson as json
37 from xml.dom import minidom
45 from pithos.backends import backend
47 DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
48 "%A, %d-%b-%y %H:%M:%S GMT",
49 "%a, %d %b %Y %H:%M:%S GMT"]
51 class AaiClient(Client):
52 def request(self, **request):
53 request['HTTP_X_AUTH_TOKEN'] = '46e427d657b20defe352804f0eb6f8a2'
54 return super(AaiClient, self).request(**request)
56 class BaseTestCase(TestCase):
57 #TODO unauthorized request
59 self.client = AaiClient()
62 'X-Account-Container-Count',
63 'X-Account-Bytes-Used',
69 'X-Container-Object-Count',
70 'X-Container-Bytes-Used',
75 'X-Container-Block-Size',
76 'X-Container-Block-Hash',),
87 'X-Object-Version-Timestamp',)}
88 self.contentTypes = {'xml':'application/xml',
89 'json':'application/json',
104 self.return_codes = (400, 401, 404, 503,)
106 def assertFault(self, response, status_code, name):
107 self.assertEqual(response.status_code, status_code)
109 def assertBadRequest(self, response):
110 self.assertFault(response, 400, 'badRequest')
112 def assertItemNotFound(self, response):
113 self.assertFault(response, 404, 'itemNotFound')
115 def assertUnauthorized(self, response):
116 self.assertFault(response, 401, 'unauthorized')
118 def assertServiceUnavailable(self, response):
119 self.assertFault(response, 503, 'serviceUnavailable')
121 def assertNonEmpty(self, response):
122 self.assertFault(response, 409, 'nonEmpty')
124 def assert_status(self, response, codes):
125 l = [elem for elem in self.return_codes]
126 if type(codes) == types.ListType:
130 self.assertTrue(response.status_code in l)
132 def get_account_meta(self, account, exp_meta={}):
133 path = '/v1/%s' % account
134 response = self.client.head(path)
135 self.assert_status(response, 204)
136 self.assert_headers(response, 'account', exp_meta)
139 def list_containers(self, account, limit=10000, marker='', format='',
143 params.pop('account')
144 path = '/v1/%s' % account
145 response = self.client.get(path, params, **headers)
146 self.assert_status(response, [200, 204, 304, 412])
147 response.content = response.content.strip()
149 self.assert_extended(response, format, 'container', limit)
151 names = get_content_splitted(response)
152 self.assertTrue(len(names) <= limit)
155 def update_account_meta(self, account, **metadata):
156 path = '/v1/%s' % account
157 response = self.client.post(path, **metadata)
158 response.content = response.content
159 self.assert_status(response, 202)
162 def get_container_meta(self, account, container, exp_meta={}):
165 params.pop('account')
166 params.pop('container')
167 path = '/v1/%s/%s' %(account, container)
168 response = self.client.head(path, params)
169 response.content = response.content
170 self.assert_status(response, 204)
171 if response.status_code == 204:
172 self.assert_headers(response, 'container', exp_meta)
175 def list_objects(self, account, container, limit=10000, marker='',
176 prefix='', format='', path='', delimiter='', meta='',
180 params.pop('account')
181 params.pop('container')
182 path = '/v1/%s/%s' % (account, container)
183 response = self.client.get(path, params, **headers)
184 response.content = response.content.strip()
186 self.assert_extended(response, format, 'object', limit)
187 self.assert_status(response, [200, 204, 304, 412])
190 def create_container(self, account, name, **meta):
191 path = '/v1/%s/%s' %(account, name)
192 response = self.client.put(path, **meta)
193 response.content = response.content
194 self.assert_status(response, [201, 202])
197 def update_container_meta(self, account, name, **meta):
198 path = '/v1/%s/%s' %(account, name)
199 response = self.client.post(path,
201 content_type='text/xml',
202 follow=False, **meta)
203 response.content = response.content
204 self.assert_status(response, 202)
207 def delete_container(self, account, container):
208 path = '/v1/%s/%s' %(account, container)
209 response = self.client.delete(path)
210 response.content = response.content
211 self.assert_status(response, [204, 409])
214 def get_object_meta(self, account, container, name):
215 path = '/v1/%s/%s/%s' %(account, container, name)
216 response = self.client.head(path)
217 response.content = response.content
218 self.assert_status(response, 200)
221 def get_object(self, account, container, name, format='', **headers):
222 path = '/v1/%s/%s/%s' %(account, container, name)
223 response = self.client.get(path, {'format':format}, **headers)
224 response.content = response.content
225 self.assert_status(response, [200, 206, 304, 412, 416])
226 if response.status_code in [200, 206]:
227 self.assert_headers(response, 'object')
230 def upload_object(self, account, container, name, data, content_type='',
232 path = '/v1/%s/%s/%s' %(account, container, name)
233 response = self.client.put(path, data, content_type, **headers)
234 response.content = response.content
235 self.assert_status(response, [201, 411, 422])
236 if response.status_code == 201:
237 self.assertTrue(response['Etag'])
240 def copy_object(self, account, container, name, src, **headers):
241 path = '/v1/%s/%s/%s' %(account, container, name)
242 headers['HTTP_X_COPY_FROM'] = src
243 response = self.client.put(path, **headers)
244 response.content = response.content
245 self.assert_status(response, 201)
248 def move_object(self, account, container, name, src, **headers):
249 path = '/v1/%s/%s/%s' % (account, container, name)
250 headers['HTTP_X_MOVE_FROM'] = src
251 response = self.client.put(path, **headers)
252 response.content = response.content
253 self.assert_status(response, 201)
256 def update_object(self, account, container, name, data={},
257 content_type='MULTIPART_CONTENT', **headers):
258 path = '/v1/%s/%s/%s' %(account, container, name)
259 response = self.client.post(path, data, content_type, **headers)
260 response.content = response.content
261 self.assert_status(response, [202, 204, 416])
264 def delete_object(self, account, container, name):
265 path = '/v1/%s/%s/%s' %(account, container, name)
266 response = self.client.delete(path)
267 response.content = response.content
268 self.assert_status(response, 204)
271 def assert_headers(self, response, type, exp_meta={}):
272 entities = ['Account', 'Container', 'Container-Object', 'Object']
273 user_defined_meta = ['X-%s-Meta' %elem for elem in entities]
274 headers = [item for item in response._headers.values()]
275 t = tuple(user_defined_meta)
276 system_headers = [h for h in headers if not h[0].startswith(t)]
277 for h in system_headers:
278 self.assertTrue(h[0] in self.headers[type])
280 self.assertEqual(h[1], exp_meta[h[0]])
282 def assert_extended(self, response, format, type, size):
283 exp_content_type = self.contentTypes[format]
284 self.assertEqual(response['Content-Type'].find(exp_content_type), 0)
286 self.assert_xml(response, type, size)
287 elif format == 'json':
288 self.assert_json(response, type, size)
290 def assert_json(self, response, type, size):
291 convert = lambda s: s.lower()
292 info = [convert(elem) for elem in self.extended[type]]
293 data = json.loads(response.content)
294 self.assertTrue(len(data) <= size)
297 if 'subdir' in i.keys():
299 self.assertTrue(item in i.keys())
301 def assert_xml(self, response, type, size):
302 convert = lambda s: s.lower()
303 info = [convert(elem) for elem in self.extended[type]]
305 info.remove('content_encoding')
308 xml = minidom.parseString(response.content)
310 nodes = xml.getElementsByTagName(item)
311 self.assertTrue(nodes)
312 self.assertTrue(len(nodes) <= size)
315 def upload_os_file(self, account, container, fullpath, meta={}):
317 f = open(fullpath, 'r')
319 name = os.path.split(fullpath)[-1]
320 return self.upload_data(account, container, name, data)
324 def upload_random_data(self, account, container, name, length=1024,
326 data = get_random_data(length)
327 return self.upload_data(account, container, name, data, meta)
329 def upload_data(self, account, container, name, data, meta={}):
334 obj['hash'] = compute_md5_hash(obj['data'])
335 meta.update({'HTTP_X_OBJECT_META_TEST':'test1',
336 'HTTP_ETAG':obj['hash']})
337 type, enc = mimetypes.guess_type(name)
338 meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
340 meta['HTTP_CONTENT_ENCODING'] = enc
343 r = self.upload_object(account,
347 meta['HTTP_CONTENT_TYPE'],
349 if r.status_code == 201:
354 class AccountHead(BaseTestCase):
356 BaseTestCase.setUp(self)
357 self.account = 'test'
358 self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
359 for item in self.containers:
360 self.create_container(self.account, item)
363 for c in get_content_splitted(self.list_containers(self.account)):
364 self.delete_container(self.account, c)
366 def test_get_account_meta(self):
367 response = self.get_account_meta(self.account)
368 r2 = self.list_containers(self.account)
369 containers = get_content_splitted(r2)
370 l = str(len(containers))
371 self.assertEqual(response['X-Account-Container-Count'], l)
374 r = self.get_container_meta(self.account, c)
375 size = size + int(r['X-Container-Bytes-Used'])
376 self.assertEqual(response['X-Account-Bytes-Used'], str(size))
378 #def test_get_account_401(self):
379 # response = self.get_account_meta('non-existing-account')
381 # self.assertEqual(response.status_code, 401)
383 class AccountGet(BaseTestCase):
385 BaseTestCase.setUp(self)
386 self.account = 'test'
387 #create some containers
388 self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
389 for item in self.containers:
390 self.create_container(self.account, item)
393 for c in get_content_splitted(self.list_containers(self.account)):
394 response = self.delete_container(self.account, c)
398 response = self.list_containers(self.account)
399 containers = get_content_splitted(response)
400 self.assertEquals(self.containers, containers)
402 #def test_list_204(self):
403 # response = self.list_containers('non-existing-account')
404 # self.assertEqual(response.status_code, 204)
406 def test_list_with_limit(self):
408 response = self.list_containers(self.account, limit=limit)
409 containers = get_content_splitted(response)
410 self.assertEquals(len(containers), limit)
411 self.assertEquals(self.containers[:2], containers)
413 def test_list_with_marker(self):
416 response = self.list_containers(self.account, limit=l, marker=m)
417 containers = get_content_splitted(response)
418 i = self.containers.index(m) + 1
419 self.assertEquals(self.containers[i:(i+l)], containers)
422 response = self.list_containers(self.account, limit=l, marker=m)
423 containers = get_content_splitted(response)
424 i = self.containers.index(m) + 1
425 self.assertEquals(self.containers[i:(i+l)], containers)
427 #def test_extended_list(self):
428 # self.list_containers(self.account, limit=3, format='xml')
429 # self.list_containers(self.account, limit=3, format='json')
431 def test_list_json_with_marker(self):
434 response = self.list_containers(self.account, limit=l, marker=m,
436 containers = json.loads(response.content)
437 self.assertEqual(containers[0]['name'], 'kiwis')
438 self.assertEqual(containers[1]['name'], 'oranges')
440 def test_list_xml_with_marker(self):
443 response = self.list_containers(self.account, limit=l, marker=m,
445 xml = minidom.parseString(response.content)
446 nodes = xml.getElementsByTagName('name')
447 self.assertEqual(len(nodes), 1)
448 self.assertEqual(nodes[0].childNodes[0].data, 'pears')
450 def test_if_modified_since(self):
451 t = datetime.datetime.utcnow()
452 t2 = t - datetime.timedelta(minutes=10)
455 self.create_container(self.account,
458 for f in DATE_FORMATS:
459 past = t2.strftime(f)
461 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
462 r = self.list_containers(self.account, **headers)
465 self.assertEqual(r.status_code, 200)
467 def test_if_modified_since_invalid_date(self):
468 headers = {'HTTP_IF_MODIFIED_SINCE':''}
469 r = self.list_containers(self.account, **headers)
472 self.assertEqual(r.status_code, 200)
474 def test_if_not_modified_since(self):
475 now = datetime.datetime.utcnow()
476 since = now + datetime.timedelta(1)
478 for f in DATE_FORMATS:
479 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
480 r = self.list_containers(self.account, **headers)
483 self.assertEqual(r.status_code, 304)
485 def test_if_unmodified_since(self):
486 now = datetime.datetime.utcnow()
487 since = now + datetime.timedelta(1)
489 for f in DATE_FORMATS:
490 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
491 r = self.list_containers(self.account, **headers)
494 self.assertEqual(r.status_code, 200)
495 self.assertEqual(self.containers, get_content_splitted(r))
497 def test_if_unmodified_since_precondition_failed(self):
498 t = datetime.datetime.utcnow()
499 t2 = t - datetime.timedelta(minutes=10)
502 self.create_container(self.account,
505 for f in DATE_FORMATS:
506 past = t2.strftime(f)
508 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
509 r = self.list_containers(self.account, **headers)
512 self.assertEqual(r.status_code, 412)
514 class AccountPost(BaseTestCase):
516 BaseTestCase.setUp(self)
517 self.account = 'test'
518 self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
519 for item in self.containers:
520 self.create_container(self.account, item)
523 for c in get_content_splitted(self.list_containers(self.account)):
524 self.delete_container(self.account, c)
526 def test_update_meta(self):
527 meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
528 'HTTP_X_ACCOUNT_META_TOST':'tost'}
529 response = self.update_account_meta(self.account, **meta)
530 response = self.get_account_meta(self.account)
531 for k,v in meta.items():
532 key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
533 self.assertTrue(response[key])
534 self.assertEqual(response[key], v)
536 #def test_invalid_account_update_meta(self):
537 # with AssertInvariant(self.get_account_meta, self.account):
538 # meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
539 # 'HTTP_X_ACCOUNT_META_TOST':'tost'}
540 # response = self.update_account_meta('non-existing-account', **meta)
542 class ContainerHead(BaseTestCase):
544 BaseTestCase.setUp(self)
545 self.account = 'test'
546 self.container = 'apples'
547 self.create_container(self.account, self.container)
550 for o in self.list_objects(self.account, self.container):
551 self.delete_object(self.account, self.container, o)
552 self.delete_container(self.account, self.container)
554 def test_get_meta(self):
555 headers = {'HTTP_X_OBJECT_META_TRASH':'true'}
556 t1 = datetime.datetime.utcnow()
557 o = self.upload_random_data(self.account,
562 r = self.get_container_meta(self.account,
564 self.assertEqual(r['X-Container-Object-Count'], '1')
565 self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data'])))
566 t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
568 threashold = datetime.timedelta(seconds=1)
569 self.assertTrue(delta < threashold)
570 self.assertTrue(r['X-Container-Object-Meta'])
571 self.assertTrue('Trash' in r['X-Container-Object-Meta'])
573 class ContainerGet(BaseTestCase):
575 BaseTestCase.setUp(self)
576 self.account = 'test'
577 self.container = ['pears', 'apples']
578 for c in self.container:
579 self.create_container(self.account, c)
581 for o in o_names[:8]:
582 self.obj.append(self.upload_random_data(self.account,
585 for o in o_names[8:]:
586 self.obj.append(self.upload_random_data(self.account,
591 for c in self.container:
592 for obj in get_content_splitted(self.list_objects(self.account, c)):
593 self.delete_object(self.account, c, obj)
594 self.delete_container(self.account, c)
596 def test_list_objects(self):
597 response = self.list_objects(self.account, self.container[0])
598 objects = get_content_splitted(response)
599 l = [elem['name'] for elem in self.obj[:8]]
601 self.assertEqual(objects, l)
603 def test_list_objects_with_limit_marker(self):
604 response = self.list_objects(self.account, self.container[0], limit=2)
605 objects = get_content_splitted(response)
606 l = [elem['name'] for elem in self.obj[:8]]
608 self.assertEqual(objects, l[:2])
610 markers = ['How To Win Friends And Influence People.pdf',
614 response = self.list_objects(self.account, self.container[0],
615 limit=limit, marker=m)
616 objects = get_content_splitted(response)
617 l = [elem['name'] for elem in self.obj[:8]]
619 start = l.index(m) + 1
621 end = len(l) >= end and end or len(l)
622 self.assertEqual(objects, l[start:end])
624 def test_list_pseudo_hierarchical_folders(self):
625 response = self.list_objects(self.account, self.container[1],
626 prefix='photos', delimiter='/')
627 objects = get_content_splitted(response)
628 self.assertEquals(['photos/animals/', 'photos/me.jpg',
629 'photos/plants/'], objects)
631 response = self.list_objects(self.account, self.container[1],
632 prefix='photos/animals', delimiter='/')
633 objs = get_content_splitted(response)
634 l = ['photos/animals/cats/', 'photos/animals/dogs/']
635 self.assertEquals(l, objs)
637 response = self.list_objects(self.account, self.container[1],
639 objects = get_content_splitted(response)
640 self.assertEquals(['photos/me.jpg'], objects)
642 def test_extended_list_json(self):
643 response = self.list_objects(self.account,
645 format='json', limit=2,
646 prefix='photos/animals',
648 objects = json.loads(response.content)
649 self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
650 self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
652 def test_extended_list_xml(self):
653 response = self.list_objects(self.account, self.container[1],
654 format='xml', limit=4, prefix='photos',
656 xml = minidom.parseString(response.content)
657 dirs = xml.getElementsByTagName('subdir')
658 self.assertEqual(len(dirs), 2)
659 self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
660 self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
662 objects = xml.getElementsByTagName('name')
663 self.assertEqual(len(objects), 1)
664 self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
666 def test_list_meta_double_matching(self):
667 meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa',
668 'HTTP_X_OBJECT_META_STOCK':'true'}
669 r = self.update_object(self.account,
673 r = self.list_objects(self.account,
675 meta='Quality,Stock')
676 self.assertEqual(r.status_code, 200)
677 obj = get_content_splitted(r)
678 self.assertEqual(len(obj), 1)
679 self.assertTrue(obj, self.obj[0]['name'])
681 def test_list_using_meta(self):
682 meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa'}
683 for o in self.obj[:2]:
684 r = self.update_object(self.account,
688 meta = {'HTTP_X_OBJECT_META_STOCK':'true'}
689 for o in self.obj[3:5]:
690 r = self.update_object(self.account,
695 r = self.list_objects(self.account,
698 self.assertEqual(r.status_code, 200)
699 obj = get_content_splitted(r)
700 self.assertEqual(len(obj), 2)
701 self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
703 # test case insensitive
704 r = self.list_objects(self.account,
707 self.assertEqual(r.status_code, 200)
708 obj = get_content_splitted(r)
709 self.assertEqual(len(obj), 2)
710 self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
712 # test multiple matches
713 r = self.list_objects(self.account,
715 meta='Quality,Stock')
716 self.assertEqual(r.status_code, 200)
717 obj = get_content_splitted(r)
718 self.assertEqual(len(obj), 4)
719 self.assertTrue(obj, [o['name'] for o in self.obj[:4]])
721 # test non 1-1 multiple match
722 r = self.list_objects(self.account,
725 self.assertEqual(r.status_code, 200)
726 obj = get_content_splitted(r)
727 self.assertEqual(len(obj), 2)
728 self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
730 def test_if_modified_since(self):
731 t = datetime.datetime.utcnow()
732 t2 = t - datetime.timedelta(minutes=10)
735 self.upload_random_data(self.account,
739 for f in DATE_FORMATS:
740 past = t2.strftime(f)
742 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
743 r = self.list_objects(self.account,
744 self.container[0], **headers)
747 self.assertEqual(r.status_code, 200)
749 def test_if_modified_since_invalid_date(self):
750 headers = {'HTTP_IF_MODIFIED_SINCE':''}
751 r = self.list_objects(self.account,
752 self.container[0], **headers)
755 self.assertEqual(r.status_code, 200)
757 def test_if_not_modified_since(self):
758 now = datetime.datetime.utcnow()
759 since = now + datetime.timedelta(1)
761 for f in DATE_FORMATS:
762 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
763 r = self.list_objects(self.account,
764 self.container[0], **headers)
767 self.assertEqual(r.status_code, 304)
769 def test_if_unmodified_since(self):
770 now = datetime.datetime.utcnow()
771 since = now + datetime.timedelta(1)
773 for f in DATE_FORMATS:
774 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
775 r = self.list_objects(self.account,
776 self.container[0], **headers)
779 self.assertEqual(r.status_code, 200)
780 objlist = self.list_objects(self.account, self.container[0])
781 self.assertEqual(get_content_splitted(r),
782 get_content_splitted(objlist))
784 def test_if_unmodified_since_precondition_failed(self):
785 t = datetime.datetime.utcnow()
786 t2 = t - datetime.timedelta(minutes=10)
789 self.create_container(self.account,
792 for f in DATE_FORMATS:
793 past = t2.strftime(f)
795 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
796 r = self.list_objects(self.account,
797 self.container[0], **headers)
800 self.assertEqual(r.status_code, 412)
802 class ContainerPut(BaseTestCase):
804 BaseTestCase.setUp(self)
805 self.account = 'test'
806 self.containers = ['c1', 'c2']
809 for c in self.containers:
810 r = self.delete_container(self.account, c)
812 def test_create(self):
813 response = self.create_container(self.account, self.containers[0])
814 if response.status_code == 201:
815 response = self.list_containers(self.account)
816 content = get_content_splitted(response)
817 self.assertTrue(self.containers[0] in content)
818 r = self.get_container_meta(self.account, self.containers[0])
819 self.assertEqual(r.status_code, 204)
821 def test_create_twice(self):
822 response = self.create_container(self.account, self.containers[0])
823 if response.status_code == 201:
824 r = self.create_container(self.account, self.containers[0])
825 self.assertTrue(r.status_code, 202)
827 class ContainerPost(BaseTestCase):
829 BaseTestCase.setUp(self)
830 self.account = 'test'
831 self.container = 'apples'
832 self.create_container(self.account, self.container)
835 for o in self.list_objects(self.account, self.container):
836 self.delete_object(self.account, self.container, o)
837 self.delete_container(self.account, self.container)
839 def test_update_meta(self):
840 meta = {'HTTP_X_CONTAINER_META_TEST':'test33',
841 'HTTP_X_CONTAINER_META_TOST':'tost22'}
842 response = self.update_container_meta(self.account, self.container,
844 response = self.get_container_meta(self.account, self.container)
845 for k,v in meta.items():
846 key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
847 self.assertTrue(response[key])
848 self.assertEqual(response[key], v)
850 class ContainerDelete(BaseTestCase):
852 BaseTestCase.setUp(self)
853 self.account = 'test'
854 self.containers = ['c1', 'c2']
855 for c in self.containers:
856 self.create_container(self.account, c)
857 self.upload_random_data(self.account,
862 for c in self.containers:
863 for o in get_content_splitted(self.list_objects(self.account, c)):
864 self.delete_object(self.account, c, o)
865 self.delete_container(self.account, c)
867 def test_delete(self):
868 r = self.delete_container(self.account, self.containers[0])
869 self.assertEqual(r.status_code, 204)
871 def test_delete_non_empty(self):
872 r = self.delete_container(self.account, self.containers[1])
873 self.assertNonEmpty(r)
875 def test_delete_invalid(self):
876 self.assertItemNotFound(self.delete_container(self.account, 'c3'))
878 class ObjectHead(BaseTestCase):
881 class ObjectGet(BaseTestCase):
883 BaseTestCase.setUp(self)
884 self.account = 'test'
885 self.containers = ['c1', 'c2']
886 #create some containers
887 for c in self.containers:
888 self.create_container(self.account, c)
892 self.objects.append(self.upload_os_file(self.account,
895 self.objects.append(self.upload_os_file(self.account,
900 for c in self.containers:
901 for o in get_content_splitted(self.list_objects(self.account, c)):
902 self.delete_object(self.account, c, o)
903 self.delete_container(self.account, c)
907 r = self.get_object(self.account,
909 self.objects[0]['name'],
910 self.objects[0]['meta'])
912 self.assertEqual(r.status_code, 200)
915 self.assertEqual(r['Content-Type'],
916 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
918 def test_get_invalid(self):
919 r = self.get_object(self.account,
921 self.objects[0]['name'])
922 self.assertItemNotFound(r)
924 def test_get_partial(self):
925 #perform get with range
926 headers = {'HTTP_RANGE':'bytes=0-499'}
927 r = self.get_object(self.account,
929 self.objects[0]['name'],
932 #assert successful partial content
933 self.assertEqual(r.status_code, 206)
936 self.assertEqual(r['Content-Type'],
937 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
939 #assert content length
940 self.assertEqual(int(r['Content-Length']), 500)
943 self.assertEqual(self.objects[0]['data'][:500], r.content)
945 def test_get_final_500(self):
946 #perform get with range
947 headers = {'HTTP_RANGE':'bytes=-500'}
948 r = self.get_object(self.account,
950 self.objects[0]['name'],
953 #assert successful partial content
954 self.assertEqual(r.status_code, 206)
957 self.assertEqual(r['Content-Type'],
958 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
960 #assert content length
961 self.assertEqual(int(r['Content-Length']), 500)
964 self.assertTrue(self.objects[0]['data'][-500:], r.content)
966 def test_get_rest(self):
967 #perform get with range
968 offset = len(self.objects[0]['data']) - 500
969 headers = {'HTTP_RANGE':'bytes=%s-' %offset}
970 r = self.get_object(self.account,
972 self.objects[0]['name'],
975 #assert successful partial content
976 self.assertEqual(r.status_code, 206)
979 self.assertEqual(r['Content-Type'],
980 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
982 #assert content length
983 self.assertEqual(int(r['Content-Length']), 500)
986 self.assertTrue(self.objects[0]['data'][-500:], r.content)
988 def test_get_range_not_satisfiable(self):
989 #perform get with range
990 offset = len(self.objects[0]['data']) + 1
991 headers = {'HTTP_RANGE':'bytes=0-%s' %offset}
992 r = self.get_object(self.account,
994 self.objects[0]['name'],
997 #assert Range Not Satisfiable
998 self.assertEqual(r.status_code, 416)
1000 def test_multiple_range(self):
1001 #perform get with multiple range
1002 ranges = ['0-499', '-500', '1000-']
1003 headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
1004 r = self.get_object(self.account,
1006 self.objects[0]['name'],
1009 # assert partial content
1010 self.assertEqual(r.status_code, 206)
1012 # assert Content-Type of the reply will be multipart/byteranges
1013 self.assertTrue(r['Content-Type'])
1014 content_type_parts = r['Content-Type'].split()
1015 self.assertEqual(content_type_parts[0], ('multipart/byteranges;'))
1017 boundary = '--%s' %content_type_parts[1].split('=')[-1:][0]
1018 cparts = r.content.split(boundary)[1:-1]
1020 # assert content parts are exactly 2
1021 self.assertEqual(len(cparts), len(ranges))
1023 # for each content part assert headers
1025 for cpart in cparts:
1026 content = cpart.split('\r\n')
1027 headers = content[1:3]
1028 content_range = headers[0].split(': ')
1029 self.assertEqual(content_range[0], 'Content-Range')
1031 r = ranges[i].split('-')
1032 if not r[0] and not r[1]:
1035 start = len(self.objects[0]['data']) - int(r[1])
1036 end = len(self.objects[0]['data'])
1039 end = len(self.objects[0]['data'])
1043 fdata = self.objects[0]['data'][start:end]
1044 sdata = '\r\n'.join(content[4:-1])
1045 self.assertEqual(len(fdata), len(sdata))
1046 self.assertEquals(fdata, sdata)
1049 def test_multiple_range_not_satisfiable(self):
1050 #perform get with multiple range
1051 out_of_range = len(self.objects[0]['data']) + 1
1052 ranges = ['0-499', '-500', '%d-' %out_of_range]
1053 headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
1054 r = self.get_object(self.account,
1056 self.objects[0]['name'],
1059 # assert partial content
1060 self.assertEqual(r.status_code, 416)
1062 def test_get_with_if_match(self):
1063 #perform get with If-Match
1064 headers = {'HTTP_IF_MATCH':self.objects[0]['hash']}
1065 r = self.get_object(self.account,
1067 self.objects[0]['name'],
1070 self.assertEqual(r.status_code, 200)
1072 #assert content-type
1073 self.assertEqual(r['Content-Type'],
1074 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1076 #assert response content
1077 self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1079 def test_get_with_if_match_star(self):
1080 #perform get with If-Match *
1081 headers = {'HTTP_IF_MATCH':'*'}
1082 r = self.get_object(self.account,
1084 self.objects[0]['name'],
1087 self.assertEqual(r.status_code, 200)
1089 #assert content-type
1090 self.assertEqual(r['Content-Type'],
1091 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1093 #assert response content
1094 self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1096 def test_get_with_multiple_if_match(self):
1097 #perform get with If-Match
1098 etags = [i['hash'] for i in self.objects if i]
1099 etags = ','.join('"%s"' % etag for etag in etags)
1100 headers = {'HTTP_IF_MATCH':etags}
1101 r = self.get_object(self.account,
1103 self.objects[0]['name'],
1106 self.assertEqual(r.status_code, 200)
1108 #assert content-type
1109 self.assertEqual(r['Content-Type'],
1110 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1112 #assert content-type
1113 self.assertEqual(r['Content-Type'],
1114 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1116 #assert response content
1117 self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1119 def test_if_match_precondition_failed(self):
1120 #perform get with If-Match
1121 headers = {'HTTP_IF_MATCH':'123'}
1122 r = self.get_object(self.account,
1124 self.objects[0]['name'],
1126 #assert precondition failed
1127 self.assertEqual(r.status_code, 412)
1129 def test_if_none_match(self):
1130 #perform get with If-None-Match
1131 headers = {'HTTP_IF_NONE_MATCH':'123'}
1132 r = self.get_object(self.account,
1134 self.objects[0]['name'],
1138 self.assertEqual(r.status_code, 200)
1140 #assert content-type
1141 self.assertEqual(r['Content-Type'],
1142 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1144 def test_if_none_match(self):
1145 #perform get with If-None-Match *
1146 headers = {'HTTP_IF_NONE_MATCH':'*'}
1147 r = self.get_object(self.account,
1149 self.objects[0]['name'],
1153 self.assertEqual(r.status_code, 304)
1155 def test_if_none_match_not_modified(self):
1156 #perform get with If-None-Match
1157 headers = {'HTTP_IF_NONE_MATCH':'%s' %self.objects[0]['hash']}
1158 r = self.get_object(self.account,
1160 self.objects[0]['name'],
1163 #assert not modified
1164 self.assertEqual(r.status_code, 304)
1165 self.assertEqual(r['ETag'], self.objects[0]['hash'])
1167 def test_if_modified_since(self):
1168 t = datetime.datetime.utcnow()
1169 t2 = t - datetime.timedelta(minutes=10)
1172 self.upload_object(self.account,
1174 self.objects[0]['name'],
1175 self.objects[0]['data'][:200])
1177 for f in DATE_FORMATS:
1178 past = t2.strftime(f)
1180 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
1181 r = self.get_object(self.account,
1183 self.objects[0]['name'],
1187 self.assertEqual(r.status_code, 200)
1189 #assert content-type
1190 self.assertEqual(r['Content-Type'],
1191 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1193 def test_if_modified_since_invalid_date(self):
1194 headers = {'HTTP_IF_MODIFIED_SINCE':''}
1195 r = self.get_object(self.account,
1197 self.objects[0]['name'],
1201 self.assertEqual(r.status_code, 200)
1203 #assert content-type
1204 self.assertEqual(r['Content-Type'],
1205 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1207 def test_if_not_modified_since(self):
1208 now = datetime.datetime.utcnow()
1209 since = now + datetime.timedelta(1)
1211 for f in DATE_FORMATS:
1212 headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
1213 r = self.get_object(self.account,
1215 self.objects[0]['name'],
1218 #assert not modified
1219 self.assertEqual(r.status_code, 304)
1221 def test_if_unmodified_since(self):
1222 now = datetime.datetime.utcnow()
1223 since = now + datetime.timedelta(1)
1225 for f in DATE_FORMATS:
1226 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
1227 r = self.get_object(self.account,
1229 self.objects[0]['name'],
1232 self.assertEqual(r.status_code, 200)
1233 self.assertEqual(self.objects[0]['data'], r.content)
1235 #assert content-type
1236 self.assertEqual(r['Content-Type'],
1237 self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1239 def test_if_unmodified_since_precondition_failed(self):
1240 t = datetime.datetime.utcnow()
1241 t2 = t - datetime.timedelta(minutes=10)
1244 self.upload_object(self.account,
1246 self.objects[0]['name'],
1247 self.objects[0]['data'][:200])
1249 for f in DATE_FORMATS:
1250 past = t2.strftime(f)
1252 headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
1253 r = self.get_object(self.account,
1255 self.objects[0]['name'],
1258 self.assertEqual(r.status_code, 412)
1260 def test_hashes(self):
1263 o = self.upload_random_data(self.account,
1268 r = self.get_object(self.account,
1272 body = json.loads(r.content)
1273 hashes = body['hashes']
1274 block_size = body['block_size']
1275 block_hash = body['block_hash']
1276 block_num = l/block_size == 0 and l/block_size or l/block_size + 1
1277 self.assertTrue(len(hashes), block_num)
1280 start = i * block_size
1281 end = (i + 1) * block_size
1282 hash = compute_block_hash(o['data'][start:end], block_hash)
1283 self.assertEqual(h, hash)
1286 class ObjectPut(BaseTestCase):
1288 BaseTestCase.setUp(self)
1289 self.account = 'test'
1290 self.container = 'c1'
1291 self.create_container(self.account, self.container)
1293 self.src = os.path.join('.', 'api', 'tests.py')
1294 self.dest = os.path.join('.', 'api', 'chunked_update_test_file')
1295 create_chunked_update_test_file(self.src, self.dest)
1298 r = self.list_objects(self.account, self.container)
1299 for o in get_content_splitted(r):
1300 self.delete_object(self.account, self.container, o)
1301 self.delete_container(self.account, self.container)
1304 os.remove(self.dest)
1306 def test_upload(self):
1307 filename = 'tests.py'
1308 fullpath = os.path.join('.', 'api', filename)
1309 f = open(fullpath, 'r')
1311 hash = compute_md5_hash(data)
1312 meta={'HTTP_ETAG':hash,
1313 'HTTP_X_OBJECT_META_TEST':'test1'
1315 type, enc = mimetypes.guess_type(fullpath)
1316 meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1318 meta['HTTP_CONTENT_ENCODING'] = enc
1319 r = self.upload_object(self.account,
1323 content_type=meta['HTTP_CONTENT_TYPE'],
1325 self.assertEqual(r.status_code, 201)
1326 r = self.get_object_meta(self.account, self.container, filename)
1327 self.assertTrue(r['X-Object-Meta-Test'])
1328 self.assertEqual(r['X-Object-Meta-Test'],
1329 meta['HTTP_X_OBJECT_META_TEST'])
1331 #assert uploaded content
1332 r = self.get_object(self.account, self.container, filename)
1333 self.assertEqual(os.path.getsize(fullpath), int(r['Content-Length']))
1334 self.assertEqual(data.strip(), r.content.strip())
1336 def test_upload_unprocessable_entity(self):
1337 filename = 'tests.py'
1338 fullpath = os.path.join('.', 'api', filename)
1339 f = open(fullpath, 'r')
1341 meta={'HTTP_ETAG':'123',
1342 'HTTP_X_OBJECT_META_TEST':'test1'
1344 type, enc = mimetypes.guess_type(fullpath)
1345 meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1347 meta['HTTP_CONTENT_ENCODING'] = enc
1348 r = self.upload_object(self.account,
1352 content_type = meta['HTTP_CONTENT_TYPE'],
1354 self.assertEqual(r.status_code, 422)
1356 def test_chucked_update(self):
1357 objname = os.path.split(self.src)[-1:][0]
1358 f = open(self.dest, 'r')
1361 type, enc = mimetypes.guess_type(self.dest)
1362 meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1364 meta['HTTP_CONTENT_ENCODING'] = enc
1365 meta.update({'HTTP_TRANSFER_ENCODING':'chunked'})
1366 r = self.upload_object(self.account,
1370 content_type = 'plain/text',
1372 self.assertEqual(r.status_code, 201)
1374 r = self.get_object(self.account,
1377 uploaded_data = r.content
1378 f = open(self.src, 'r')
1379 actual_data = f.read()
1380 self.assertEqual(actual_data, uploaded_data)
1382 class ObjectCopy(BaseTestCase):
1384 BaseTestCase.setUp(self)
1385 self.account = 'test'
1386 self.containers = ['c1', 'c2']
1387 for c in self.containers:
1388 self.create_container(self.account, c)
1389 self.obj = self.upload_os_file(self.account,
1394 for c in self.containers:
1395 for o in get_content_splitted(self.list_objects(self.account, c)):
1396 self.delete_object(self.account, c, o)
1397 self.delete_container(self.account, c)
1399 def test_copy(self):
1400 with AssertInvariant(self.get_object_meta, self.account,
1401 self.containers[0], self.obj['name']):
1403 meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1404 src_path = os.path.join('/', self.containers[0], self.obj['name'])
1405 r = self.copy_object(self.account,
1410 #assert copy success
1411 self.assertEqual(r.status_code, 201)
1413 #assert access the new object
1414 r = self.get_object_meta(self.account,
1417 self.assertTrue(r['X-Object-Meta-Test'])
1418 self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1420 #assert etag is the same
1421 self.assertEqual(r['ETag'], self.obj['hash'])
1423 #assert src object still exists
1424 r = self.get_object_meta(self.account, self.containers[0],
1426 self.assertEqual(r.status_code, 200)
1428 def test_copy_from_different_container(self):
1429 with AssertInvariant(self.get_object_meta,
1433 meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1434 src_path = os.path.join('/', self.containers[0], self.obj['name'])
1435 r = self.copy_object(self.account,
1440 self.assertEqual(r.status_code, 201)
1442 # assert updated metadata
1443 r = self.get_object_meta(self.account,
1446 self.assertTrue(r['X-Object-Meta-Test'])
1447 self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1449 #assert src object still exists
1450 r = self.get_object_meta(self.account, self.containers[0],
1452 self.assertEqual(r.status_code, 200)
1454 def test_copy_invalid(self):
1455 #copy from invalid object
1456 meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1457 r = self.copy_object(self.account,
1460 os.path.join('/', self.containers[0], 'test.py'),
1462 self.assertItemNotFound(r)
1464 #copy from invalid container
1465 meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1466 src_path = os.path.join('/', self.containers[1], self.obj['name'])
1467 r = self.copy_object(self.account,
1472 self.assertItemNotFound(r)
1474 class ObjectMove(ObjectCopy):
1475 def test_move(self):
1477 meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1478 src_path = os.path.join('/', self.containers[0], self.obj['name'])
1479 r = self.move_object(self.account,
1484 #assert successful move
1485 self.assertEqual(r.status_code, 201)
1487 #assert updated metadata
1488 r = self.get_object_meta(self.account,
1491 self.assertTrue(r['X-Object-Meta-Test'])
1492 self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1494 #assert src object no more exists
1495 r = self.get_object_meta(self.account, self.containers[0],
1497 self.assertItemNotFound(r)
1499 class ObjectPost(BaseTestCase):
1501 BaseTestCase.setUp(self)
1502 self.account = 'test'
1503 self.containers = ['c1', 'c2']
1504 for c in self.containers:
1505 self.create_container(self.account, c)
1506 self.obj = self.upload_os_file(self.account,
1511 for c in self.containers:
1512 for o in get_content_splitted(self.list_objects(self.account, c)):
1513 self.delete_object(self.account, c, o)
1514 self.delete_container(self.account, c)
1516 def test_update_meta(self):
1517 #perform update metadata
1518 more = {'HTTP_X_OBJECT_META_FOO':'foo',
1519 'HTTP_X_OBJECT_META_BAR':'bar'}
1520 r = self.update_object(self.account,
1524 #assert request accepted
1525 self.assertEqual(r.status_code, 202)
1527 #assert old metadata are still there
1528 r = self.get_object_meta(self.account, self.containers[0],
1530 self.assertTrue('X-Object-Meta-Test' not in r.items())
1532 #assert new metadata have been updated
1533 for k,v in more.items():
1534 key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
1535 self.assertTrue(r[key])
1536 self.assertTrue(r[key], v)
1538 def test_update_object(self,
1541 instance_length = True,
1542 content_length = 500):
1543 l = len(self.obj['data'])
1544 length = instance_length and l or '*'
1545 range = 'bytes %d-%d/%s' %(first_byte_pos,
1548 partial = last_byte_pos - first_byte_pos + 1
1549 data = get_random_data(partial)
1550 more = {'HTTP_CONTENT_RANGE':range}
1552 more.update({'CONTENT_LENGTH':'%s' % content_length})
1554 r = self.update_object(self.account,
1558 'application/octet-stream',
1561 if partial < 0 or (instance_length and l <= last_byte_pos):
1562 self.assertEqual(r.status_code, 202)
1563 elif content_length and content_length != partial:
1564 self.assertEqual(r.status_code, 400)
1566 self.assertEqual(r.status_code, 204)
1568 #check modified object
1569 r = self.get_object(self.account,
1572 self.assertEqual(r.content[0:partial], data)
1573 self.assertEqual(r.content[partial:l], self.obj['data'][partial:l])
1575 def test_update_object_no_content_length(self):
1576 self.test_update_object(content_length = None)
1578 def test_update_object_invalid_content_length(self):
1579 with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1581 self.test_update_object(content_length = 1000)
1583 def test_update_object_with_unknown_instance_length(self):
1584 self.test_update_object(instance_length = False)
1586 def test_update_object_invalid_range(self):
1587 with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1589 self.test_update_object(499, 0, True)
1591 def test_update_object_invalid_range_and_length(self):
1592 with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1594 self.test_update_object(499, 0, True, -1)
1596 def test_update_object_invalid_range_with_no_content_length(self):
1597 with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1599 self.test_update_object(499, 0, True, content_length = None)
1601 def test_update_object_out_of_limits(self):
1602 with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1604 l = len(self.obj['data'])
1605 self.test_update_object(0, l+1, True)
1607 def test_append(self):
1608 data = get_random_data(500)
1609 more = {'CONTENT_LENGTH':'500',
1610 'HTTP_CONTENT_RANGE':'bytes */*'}
1612 r = self.update_object(self.account,
1616 'application/octet-stream',
1619 self.assertEqual(r.status_code, 204)
1621 r = self.get_object(self.account,
1624 self.assertEqual(len(r.content), len(self.obj['data']) + 500)
1625 self.assertEqual(r.content[:-500], self.obj['data'])
1627 def test_update_with_chunked_transfer(self):
1628 data, pure = create_random_chunked_data()
1630 fl = len(self.obj['data'])
1631 meta = {'HTTP_TRANSFER_ENCODING':'chunked',
1632 'HTTP_CONTENT_RANGE':'bytes 0-/%d' %fl}
1633 r = self.update_object(self.account,
1637 'application/octet-stream',
1640 #check modified object
1641 r = self.get_object(self.account,
1644 self.assertEqual(r.content[0:dl], pure)
1645 self.assertEqual(r.content[dl:fl], self.obj['data'][dl:fl])
1647 def test_update_with_chunked_transfer_strict_range(self):
1648 data, pure = create_random_chunked_data()
1650 fl = len(self.obj['data'])
1651 meta = {'HTTP_TRANSFER_ENCODING':'chunked',
1652 'HTTP_CONTENT_RANGE':'bytes 0-%d/%d' %(dl, fl)}
1653 r = self.update_object(self.account,
1657 'application/octet-stream',
1660 #check modified object
1661 r = self.get_object(self.account,
1664 self.assertEqual(r.content[0:dl+1], pure)
1665 self.assertEqual(r.content[dl+1:fl], self.obj['data'][dl+1:fl])
1667 class ObjectDelete(BaseTestCase):
1669 BaseTestCase.setUp(self)
1670 self.account = 'test'
1671 self.containers = ['c1', 'c2']
1672 for c in self.containers:
1673 self.create_container(self.account, c)
1674 self.obj = self.upload_os_file(self.account,
1679 for c in self.containers:
1680 for o in get_content_splitted(self.list_objects(self.account, c)):
1681 self.delete_object(self.account, c, o)
1682 self.delete_container(self.account, c)
1684 def test_delete(self):
1685 #perform delete object
1686 r = self.delete_object(self.account, self.containers[0],
1690 self.assertEqual(r.status_code, 204)
1692 def test_delete_invalid(self):
1693 #perform delete object
1694 r = self.delete_object(self.account, self.containers[1],
1698 self.assertItemNotFound(r)
1700 class AssertInvariant(object):
1701 def __init__(self, callable, *args, **kwargs):
1702 self.callable = callable
1704 self.kwargs = kwargs
1706 def __enter__(self):
1707 self.items = self.callable(*self.args, **self.kwargs).items()
1710 def __exit__(self, type, value, tb):
1711 items = self.callable(*self.args, **self.kwargs).items()
1712 assert self.items == items
1714 class AssertContentInvariant(object):
1715 def __init__(self, callable, *args, **kwargs):
1716 self.callable = callable
1718 self.kwargs = kwargs
1720 def __enter__(self):
1721 self.content = self.callable(*self.args, **self.kwargs).content
1724 def __exit__(self, type, value, tb):
1725 content = self.callable(*self.args, **self.kwargs).content
1726 assert self.content == content
1728 def get_content_splitted(response):
1730 return response.content.split('\n')
1732 def compute_md5_hash(data):
1736 return md5.hexdigest().lower()
1738 def compute_block_hash(data, algorithm):
1739 h = hashlib.new(algorithm)
1740 h.update(data.rstrip('\x00'))
1741 return h.hexdigest()
1743 def create_chunked_update_test_file(src, dest):
1745 fw = open(dest, 'w')
1746 data = fr.readline()
1748 fw.write(hex(len(data)))
1751 data = fr.readline()
1755 def create_random_chunked_data(rows=5):
1760 data = get_random_data(random.randint(1, 100))
1761 out.append(hex(len(data)))
1767 return '\r\n'.join(out), ''.join(pure)
1769 def get_random_data(length=500):
1770 char_set = string.ascii_uppercase + string.digits
1771 return ''.join(random.choice(char_set) for x in range(length))
1773 o_names = ['kate.jpg',
1774 'kate_beckinsale.jpg',
1775 'How To Win Friends And Influence People.pdf',
1776 'moms_birthday.jpg',
1778 'Disturbed - Down With The Sickness.mp3',
1779 'army_of_darkness.avi',
1781 'photos/animals/dogs/poodle.jpg',
1782 'photos/animals/dogs/terrier.jpg',
1783 'photos/animals/cats/persian.jpg',
1784 'photos/animals/cats/siamese.jpg',
1785 'photos/plants/fern.jpg',
1786 'photos/plants/rose.jpg',