'Content-Type',
'Last-Modified',
'Content-Length',
- 'Date',),
+ 'Date',
+ 'X-Container-Block-Size',
+ 'X-Container-Block-Hash',),
'object':(
'ETag',
'Content-Length',
self.assert_headers(response, 'account', exp_meta)
return response
- def list_containers(self, account, limit=10000, marker='', format='', **headers):
+ def list_containers(self, account, limit=10000, marker='', format='',
+ **headers):
params = locals()
params.pop('self')
params.pop('account')
self.assert_headers(response, 'container', exp_meta)
return response
- def list_objects(self, account, container, limit=10000, marker='', prefix='', format='', path='', delimiter='', meta='', **headers):
+ def list_objects(self, account, container, limit=10000, marker='',
+ prefix='', format='', path='', delimiter='', meta='',
+ **headers):
params = locals()
params.pop('self')
params.pop('account')
def update_container_meta(self, account, name, **meta):
path = '/v1/%s/%s' %(account, name)
- response = self.client.post(path, data={}, content_type='text/xml', follow=False, **meta)
+ response = self.client.post(path,
+ data=None,
+ content_type='text/xml',
+ follow=False, **meta)
response.content = response.content.strip()
self.assert_status(response, 202)
return response
self.assert_headers(response, 'object')
return response
- def upload_object(self, account, container, name, data, content_type='application/json', **headers):
+ def upload_object(self, account, container, name, data, content_type='',
+ **headers):
path = '/v1/%s/%s/%s' %(account, container, name)
response = self.client.put(path, data, content_type, **headers)
response.content = response.content.strip()
self.assert_status(response, 201)
return response
- def update_object_meta(self, account, container, name, **headers):
+ def update_object(self, account, container, name, data={},
+ content_type='MULTIPART_CONTENT', **headers):
path = '/v1/%s/%s/%s' %(account, container, name)
- response = self.client.post(path, **headers)
+ response = self.client.post(path, data, content_type, **headers)
response.content = response.content.strip()
- self.assert_status(response, 202)
+ self.assert_status(response, [202, 204, 416])
return response
def delete_object(self, account, container, name):
entities = ['Account', 'Container', 'Container-Object', 'Object']
user_defined_meta = ['X-%s-Meta' %elem for elem in entities]
headers = [item for item in response._headers.values()]
- system_headers = [h for h in headers if not h[0].startswith(tuple(user_defined_meta))]
+ t = tuple(user_defined_meta)
+ system_headers = [h for h in headers if not h[0].startswith(t)]
for h in system_headers:
self.assertTrue(h[0] in self.headers[type])
if exp_meta:
self.assertEqual(h[1], exp_meta[h[0]])
def assert_extended(self, response, format, type, size):
- self.assertEqual(response['Content-Type'].find(self.contentTypes[format]), 0)
+ exp_content_type = self.contentTypes[format]
+ self.assertEqual(response['Content-Type'].find(exp_content_type), 0)
if format == 'xml':
self.assert_xml(response, type, size)
elif format == 'json':
except IOError:
return
- def upload_random_data(self, account, container, name, length=1024, meta={}):
- char_set = string.ascii_uppercase + string.digits
- data = ''.join(random.choice(char_set) for x in range(length))
+ def upload_random_data(self, account, container, name, length=1024,
+ meta={}):
+ data = get_random_data(length)
return self.upload_data(account, container, name, data, meta)
def upload_data(self, account, container, name, data, meta={}):
obj['hash'] = compute_md5_hash(obj['data'])
meta.update({'HTTP_X_OBJECT_META_TEST':'test1',
'HTTP_ETAG':obj['hash']})
- meta['HTTP_CONTENT_TYPE'], enc = mimetypes.guess_type(name)
+ type, enc = mimetypes.guess_type(name)
+ meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
if enc:
- meta['HTTP_CONTENT_TYPE'] = enc
+ meta['HTTP_CONTENT_ENCODING'] = enc
+
obj['meta'] = meta
r = self.upload_object(account,
container,
response = self.get_account_meta(self.account)
r2 = self.list_containers(self.account)
containers = get_content_splitted(r2)
- self.assertEqual(response['X-Account-Container-Count'], str(len(containers)))
+ l = str(len(containers))
+ self.assertEqual(response['X-Account-Container-Count'], l)
size = 0
for c in containers:
r = self.get_container_meta(self.account, c)
self.assertEquals(self.containers[:2], containers)
def test_list_with_marker(self):
- limit = 2
- marker = 'bananas'
- response = self.list_containers(self.account, limit=limit, marker=marker)
+ l = 2
+ m = 'bananas'
+ response = self.list_containers(self.account, limit=l, marker=m)
containers = get_content_splitted(response)
- i = self.containers.index(marker) + 1
- self.assertEquals(self.containers[i:(i+limit)], containers)
+ i = self.containers.index(m) + 1
+ self.assertEquals(self.containers[i:(i+l)], containers)
- marker = 'oranges'
- response = self.list_containers(self.account, limit=limit, marker=marker)
+ m = 'oranges'
+ response = self.list_containers(self.account, limit=l, marker=m)
containers = get_content_splitted(response)
- i = self.containers.index(marker) + 1
- self.assertEquals(self.containers[i:(i+limit)], containers)
+ i = self.containers.index(m) + 1
+ self.assertEquals(self.containers[i:(i+l)], containers)
#def test_extended_list(self):
# self.list_containers(self.account, limit=3, format='xml')
# self.list_containers(self.account, limit=3, format='json')
def test_list_json_with_marker(self):
- limit = 2
- marker = 'bananas'
- response = self.list_containers(self.account, limit=limit, marker=marker, format='json')
+ l = 2
+ m = 'bananas'
+ response = self.list_containers(self.account, limit=l, marker=m,
+ format='json')
containers = json.loads(response.content)
self.assertEqual(containers[0]['name'], 'kiwis')
self.assertEqual(containers[1]['name'], 'oranges')
def test_list_xml_with_marker(self):
- limit = 2
- marker = 'oranges'
- response = self.list_containers(self.account, limit=limit, marker=marker, format='xml')
+ l = 2
+ m = 'oranges'
+ response = self.list_containers(self.account, limit=l, marker=m,
+ format='xml')
xml = minidom.parseString(response.content)
nodes = xml.getElementsByTagName('name')
self.assertEqual(len(nodes), 1)
self.delete_container(self.account, c)
def test_update_meta(self):
- meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
+ meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
+ 'HTTP_X_ACCOUNT_META_TOST':'tost'}
response = self.update_account_meta(self.account, **meta)
response = self.get_account_meta(self.account)
for k,v in meta.items():
#def test_invalid_account_update_meta(self):
# with AssertInvariant(self.get_account_meta, self.account):
- # meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
+ # meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
+ # 'HTTP_X_ACCOUNT_META_TOST':'tost'}
# response = self.update_account_meta('non-existing-account', **meta)
class ContainerHead(BaseTestCase):
'moms_birthday.jpg']
limit = 4
for m in markers:
- response = self.list_objects(self.account, self.container[0], limit=limit, marker=m)
+ response = self.list_objects(self.account, self.container[0],
+ limit=limit, marker=m)
objects = get_content_splitted(response)
l = [elem['name'] for elem in self.obj[:8]]
l.sort()
self.assertEqual(objects, l[start:end])
def test_list_pseudo_hierarchical_folders(self):
- response = self.list_objects(self.account, self.container[1], prefix='photos', delimiter='/')
+ response = self.list_objects(self.account, self.container[1],
+ prefix='photos', delimiter='/')
objects = get_content_splitted(response)
- self.assertEquals(['photos/animals/', 'photos/me.jpg', 'photos/plants/'], objects)
+ self.assertEquals(['photos/animals/', 'photos/me.jpg',
+ 'photos/plants/'], objects)
- response = self.list_objects(self.account, self.container[1], prefix='photos/animals', delimiter='/')
- objects = get_content_splitted(response)
- self.assertEquals(['photos/animals/cats/', 'photos/animals/dogs/'], objects)
+ response = self.list_objects(self.account, self.container[1],
+ prefix='photos/animals', delimiter='/')
+ objs = get_content_splitted(response)
+ l = ['photos/animals/cats/', 'photos/animals/dogs/']
+ self.assertEquals(l, objs)
- response = self.list_objects(self.account, self.container[1], path='photos')
+ response = self.list_objects(self.account, self.container[1],
+ path='photos')
objects = get_content_splitted(response)
self.assertEquals(['photos/me.jpg'], objects)
def test_extended_list_json(self):
- response = self.list_objects(self.account, self.container[1], format='json', limit=2, prefix='photos/animals', delimiter='/')
+ response = self.list_objects(self.account,
+ self.container[1],
+ format='json', limit=2,
+ prefix='photos/animals',
+ delimiter='/')
objects = json.loads(response.content)
self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
def test_extended_list_xml(self):
- response = self.list_objects(self.account, self.container[1], format='xml', limit=4, prefix='photos', delimiter='/')
+ response = self.list_objects(self.account, self.container[1],
+ format='xml', limit=4, prefix='photos',
+ delimiter='/')
xml = minidom.parseString(response.content)
dirs = xml.getElementsByTagName('subdir')
self.assertEqual(len(dirs), 2)
def test_list_using_meta(self):
meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa'}
for o in self.obj[:2]:
- r = self.update_object_meta(self.account,
+ r = self.update_object(self.account,
self.container[0],
o['name'],
**meta)
meta = {'HTTP_X_OBJECT_META_STOCK':'true'}
for o in self.obj[3:5]:
- r = self.update_object_meta(self.account,
+ r = self.update_object(self.account,
self.container[0],
o['name'],
**meta)
#assert success
self.assertEqual(r.status_code, 200)
+ objlist = self.list_objects(self.account, self.container[0])
self.assertEqual(get_content_splitted(r),
- get_content_splitted(self.list_objects(self.account,
- self.container[0])))
+ get_content_splitted(objlist))
def test_if_unmodified_since_precondition_failed(self):
t = datetime.datetime.utcnow()
response = self.create_container(self.account, self.containers[0])
if response.status_code == 201:
response = self.list_containers(self.account)
- self.assertTrue(self.containers[0] in get_content_splitted(response))
+ content = get_content_splitted(response)
+ self.assertTrue(self.containers[0] in content)
r = self.get_container_meta(self.account, self.containers[0])
self.assertEqual(r.status_code, 204)
def test_create_twice(self):
response = self.create_container(self.account, self.containers[0])
if response.status_code == 201:
- self.assertTrue(self.create_container(self.account, self.containers[0]).status_code, 202)
+ r = self.create_container(self.account, self.containers[0])
+ self.assertTrue(r.status_code, 202)
class ContainerPost(BaseTestCase):
def setUp(self):
def test_update_meta(self):
meta = {'HTTP_X_CONTAINER_META_TEST':'test33',
'HTTP_X_CONTAINER_META_TOST':'tost22'}
- response = self.update_container_meta(self.account, self.container, **meta)
+ response = self.update_container_meta(self.account, self.container,
+ **meta)
response = self.get_container_meta(self.account, self.container)
for k,v in meta.items():
key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
self.objects[0]['meta'])
#assert success
self.assertEqual(r.status_code, 200)
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
def test_get_invalid(self):
r = self.get_object(self.account,
#assert successful partial content
self.assertEqual(r.status_code, 206)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert content length
self.assertEqual(int(r['Content-Length']), 500)
#assert successful partial content
self.assertEqual(r.status_code, 206)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert content length
self.assertEqual(int(r['Content-Length']), 500)
#assert successful partial content
self.assertEqual(r.status_code, 206)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert content length
self.assertEqual(int(r['Content-Length']), 500)
#assert get success
self.assertEqual(r.status_code, 200)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert response content
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
#assert get success
self.assertEqual(r.status_code, 200)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert response content
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
#assert get success
self.assertEqual(r.status_code, 200)
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
+
#assert response content
self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
#assert get success
self.assertEqual(r.status_code, 200)
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
def test_if_none_match(self):
#perform get with If-None-Match *
#assert get success
self.assertEqual(r.status_code, 200)
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
def test_if_modified_since_invalid_date(self):
headers = {'HTTP_IF_MODIFIED_SINCE':''}
#assert get success
self.assertEqual(r.status_code, 200)
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
def test_if_not_modified_since(self):
now = datetime.datetime.utcnow()
#assert success
self.assertEqual(r.status_code, 200)
self.assertEqual(self.objects[0]['data'], r.content)
+
+ #assert content-type
+ self.assertEqual(r['Content-Type'],
+ self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
def test_if_unmodified_since_precondition_failed(self):
t = datetime.datetime.utcnow()
self.assertEqual(r.status_code, 412)
def test_hashes(self):
- #block_size = 4 * 1024 * 1024
- block_size = 128 * 1024
- block_num = 2
- l = block_size * block_num + 1
- fname = 'largefile.txt'
+ l = 8388609
+ fname = 'largefile'
o = self.upload_random_data(self.account,
self.containers[1],
fname,
self.containers[1],
fname,
'json')
- hashes = json.loads(r.content)['hashes']
- self.assertTrue(len(hashes), block_num + 1)
+ body = json.loads(r.content)
+ hashes = body['hashes']
+ block_size = body['block_size']
+ block_hash = body['block_hash']
+ block_num = l/block_size == 0 and l/block_size or l/block_size + 1
+ self.assertTrue(len(hashes), block_num)
i = 0
for h in hashes:
start = i * block_size
end = (i + 1) * block_size
- self.assertEqual(h, compute_block_hash(o['data'][start:end]))
+ hash = compute_block_hash(o['data'][start:end], block_hash)
+ self.assertEqual(h, hash)
i += 1
class ObjectPut(BaseTestCase):
create_chunked_update_test_file(self.src, self.dest)
def tearDown(self):
- for o in get_content_splitted(self.list_objects(self.account, self.container)):
+ r = self.list_objects(self.account, self.container)
+ for o in get_content_splitted(r):
self.delete_object(self.account, self.container, o)
self.delete_container(self.account, self.container)
'HTTP_X_OBJECT_MANIFEST':123,
'HTTP_X_OBJECT_META_TEST':'test1'
}
- meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(fullpath)
+ type, enc = mimetypes.guess_type(fullpath)
+ meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
+ if enc:
+ meta['HTTP_CONTENT_ENCODING'] = enc
r = self.upload_object(self.account,
self.container,
filename,
self.assertEqual(r.status_code, 201)
r = self.get_object_meta(self.account, self.container, filename)
self.assertTrue(r['X-Object-Meta-Test'])
- self.assertEqual(r['X-Object-Meta-Test'], meta['HTTP_X_OBJECT_META_TEST'])
+ self.assertEqual(r['X-Object-Meta-Test'],
+ meta['HTTP_X_OBJECT_META_TEST'])
#assert uploaded content
r = self.get_object(self.account, self.container, filename)
'HTTP_X_OBJECT_MANIFEST':123,
'HTTP_X_OBJECT_META_TEST':'test1'
}
- meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(fullpath)
+ type, enc = mimetypes.guess_type(fullpath)
+ meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
+ if enc:
+ meta['HTTP_CONTENT_ENCODING'] = enc
r = self.upload_object(self.account,
self.container,
filename,
f = open(self.dest, 'r')
data = f.read()
meta = {}
- meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(objname)
+ type, enc = mimetypes.guess_type(self.dest)
+ meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
+ if enc:
+ meta['HTTP_CONTENT_ENCODING'] = enc
meta.update({'HTTP_TRANSFER_ENCODING':'chunked'})
r = self.upload_object(self.account,
self.container,
self.delete_container(self.account, c)
def test_copy(self):
- with AssertInvariant(self.get_object_meta, self.account, self.containers[0], self.obj['name']):
+ with AssertInvariant(self.get_object_meta, self.account,
+ self.containers[0], self.obj['name']):
#perform copy
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
src_path = os.path.join('/', self.containers[0], self.obj['name'])
self.assertEqual(r['ETag'], self.obj['hash'])
#assert src object still exists
- r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
+ r = self.get_object_meta(self.account, self.containers[0],
+ self.obj['name'])
self.assertEqual(r.status_code, 204)
def test_copy_from_different_container(self):
self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
#assert src object still exists
- r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
+ r = self.get_object_meta(self.account, self.containers[0],
+ self.obj['name'])
self.assertEqual(r.status_code, 204)
def test_copy_invalid(self):
self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
#assert src object no more exists
- r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
+ r = self.get_object_meta(self.account, self.containers[0],
+ self.obj['name'])
self.assertItemNotFound(r)
class ObjectPost(BaseTestCase):
self.delete_object(self.account, c, o)
self.delete_container(self.account, c)
- def test_update(self):
+ def test_update_meta(self):
#perform update metadata
more = {'HTTP_X_OBJECT_META_FOO':'foo',
'HTTP_X_OBJECT_META_BAR':'bar'}
- r = self.update_object_meta(self.account,
+ r = self.update_object(self.account,
self.containers[0],
self.obj['name'],
**more)
self.assertEqual(r.status_code, 202)
#assert old metadata are still there
- r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
+ r = self.get_object_meta(self.account, self.containers[0],
+ self.obj['name'])
self.assertTrue('X-Object-Meta-Test' not in r.items())
#assert new metadata have been updated
self.assertTrue(r[key])
self.assertTrue(r[key], v)
+ def test_update_object(self,
+ first_byte_pos=0,
+ last_byte_pos=499,
+ instance_length = True,
+ content_length = 500):
+ l = len(self.obj['data'])
+ if instance_length:
+ range = 'bytes %d-%d/%d' %(first_byte_pos,
+ last_byte_pos,
+ l)
+ else:
+ range = 'bytes %d-%d/*' %(first_byte_pos,
+ last_byte_pos)
+ partial = last_byte_pos - first_byte_pos + 1
+ data = get_random_data(partial)
+ more = {'HTTP_CONTENT_RANGE':range}
+ if content_length:
+ more.update({'CONTENT_LENGTH':'%s' % content_length})
+
+ r = self.update_object(self.account,
+ self.containers[0],
+ self.obj['name'],
+ data,
+ 'application/octet-stream',
+ **more)
+
+ if partial < 0 or (instance_length and l <= last_byte_pos):
+ self.assertEqual(r.status_code, 202)
+ elif content_length and content_length != partial:
+ self.assertEqual(r.status_code, 400)
+ else:
+ self.assertEqual(r.status_code, 204)
+
+ #check modified object
+ r = self.get_object(self.account,
+ self.containers[0],
+ self.obj['name'])
+ self.assertEqual(r.content[0:partial], data)
+ self.assertEqual(r.content[partial:l], self.obj['data'][partial:l])
+
+ def test_update_object_no_content_length(self):
+ self.test_update_object(content_length = None)
+
+ def test_update_object_invalid_content_length(self):
+ with AssertContentInvariant(self.get_object, self.account, self.containers[0],
+ self.obj['name']):
+ self.test_update_object(content_length = 1000)
+
+ def test_update_object_with_unknown_instance_length(self):
+ self.test_update_object(instance_length = False)
+
+ def test_update_object_invalid_range(self):
+ with AssertContentInvariant(self.get_object, self.account, self.containers[0],
+ self.obj['name']):
+ self.test_update_object(499, 0, True)
+
+ def test_update_object_invalid_range_and_length(self):
+ with AssertContentInvariant(self.get_object, self.account, self.containers[0],
+ self.obj['name']):
+ self.test_update_object(499, 0, True, -1)
+
+ def test_update_object_invalid_range_with_no_content_length(self):
+ with AssertContentInvariant(self.get_object, self.account, self.containers[0],
+ self.obj['name']):
+ self.test_update_object(499, 0, True, content_length = None)
+
+ def test_update_object_out_of_limits(self):
+ with AssertContentInvariant(self.get_object, self.account, self.containers[0],
+ self.obj['name']):
+ l = len(self.obj['data'])
+ self.test_update_object(0, l+1, True)
+
+ def test_append(self):
+ data = get_random_data(500)
+ more = {'CONTENT_LENGTH':'500',
+ 'HTTP_CONTENT_RANGE':'bytes */*'}
+
+ r = self.update_object(self.account,
+ self.containers[0],
+ self.obj['name'],
+ data,
+ 'application/octet-stream',
+ **more)
+
+ self.assertEqual(r.status_code, 204)
+
+ r = self.get_object(self.account,
+ self.containers[0],
+ self.obj['name'])
+ self.assertEqual(len(r.content), len(self.obj['data']) + 500)
+
+ #def test_update_with_chunked_transfer(self):
+ # linenum = 5
+ # data = create_random_chunked_data(linenum)
+ # print data
+ # first_byte_pos=0,
+ # last_byte_pos=499,
+ # instance_length = True,
+ # content_length = 500
+ # l = len(self.obj['data'])
+ # meta = {'HTTP_TRANSFER_ENCODING':'chunked',
+ # 'HTTP_CONTENT_RANGE':'bytes 0-499/%d' %l}
+ # r = self.update_object(self.account,
+ # self.containers[0],
+ # self.obj['name'],
+ # data,
+ # 'application/octet-stream',
+ # **meta)
+ # print r.request, r.status_code, r
+
class ObjectDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
def test_delete(self):
#perform delete object
- r = self.delete_object(self.account, self.containers[0], self.obj['name'])
+ r = self.delete_object(self.account, self.containers[0],
+ self.obj['name'])
#assert success
self.assertEqual(r.status_code, 204)
def test_delete_invalid(self):
#perform delete object
- r = self.delete_object(self.account, self.containers[1], self.obj['name'])
+ r = self.delete_object(self.account, self.containers[1],
+ self.obj['name'])
#assert failure
self.assertItemNotFound(r)
items = self.callable(*self.args, **self.kwargs).items()
assert self.items == items
+class AssertContentInvariant(object):
+ def __init__(self, callable, *args, **kwargs):
+ self.callable = callable
+ self.args = args
+ self.kwargs = kwargs
+
+ def __enter__(self):
+ self.content = self.callable(*self.args, **self.kwargs).content
+ return self.content
+
+ def __exit__(self, type, value, tb):
+ content = self.callable(*self.args, **self.kwargs).content
+ assert self.content == content
+
def get_content_splitted(response):
if response:
return response.content.split('\n')
md5.update(data)
return md5.hexdigest().lower()
-def compute_block_hash(data):
- h = hashlib.new(backend.hash_algorithm)
+def compute_block_hash(data, algorithm):
+ h = hashlib.new(algorithm)
h.update(data.rstrip('\x00'))
return h.hexdigest()
fw.write(hex(0))
fw.write('\r\n')
+def create_random_chunked_data(rows):
+ i = 0
+ out = []
+ while i < rows:
+ data = get_random_data(random.randint(1, 100))
+ out.append(hex(len(data)))
+ out.append(data)
+ i+=1
+ out.append(hex(0))
+ out.append('\r\n')
+ return '\r\n'.join(out)
+
+def get_random_data(length):
+ char_set = string.ascii_uppercase + string.digits
+ return ''.join(random.choice(char_set) for x in range(length))
+
o_names = ['kate.jpg',
'kate_beckinsale.jpg',
'How To Win Friends And Influence People.pdf',