import mimetypes
import random
import datetime
+import string
+from pithos.backends import backend
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
"%A, %d-%b-%y %H:%M:%S GMT",
'content_encoding',
'last_modified',)}
self.return_codes = (400, 401, 404, 503,)
-
+
def assertFault(self, response, status_code, name):
self.assertEqual(response.status_code, status_code)
-
+
def assertBadRequest(self, response):
self.assertFault(response, 400, 'badRequest')
-
+
def assertItemNotFound(self, response):
self.assertFault(response, 404, 'itemNotFound')
self.assert_headers(response, 'account', exp_meta)
return response
- def list_containers(self, account, limit=10000, marker='', format=''):
+ def list_containers(self, account, limit=10000, marker='', format='', **headers):
params = locals()
params.pop('self')
params.pop('account')
path = '/v1/%s' % account
- response = self.client.get(path, params)
- self.assert_status(response, [200, 204])
+ response = self.client.get(path, params, **headers)
+ self.assert_status(response, [200, 204, 304, 412])
response.content = response.content.strip()
if format:
self.assert_extended(response, format, 'container', limit)
self.assert_headers(response, 'container', exp_meta)
return response
- def list_objects(self, account, container, limit=10000, marker='', prefix='', format='', path='', delimiter='', meta=''):
+ def list_objects(self, account, container, limit=10000, marker='', prefix='', format='', path='', delimiter='', meta='', **headers):
params = locals()
params.pop('self')
params.pop('account')
params.pop('container')
path = '/v1/%s/%s' % (account, container)
- response = self.client.get(path, params)
+ response = self.client.get(path, params, **headers)
response.content = response.content.strip()
if format:
self.assert_extended(response, format, 'object', limit)
- self.assert_status(response, [200, 204])
+ self.assert_status(response, [200, 204, 304, 412])
return response
def create_container(self, account, name, **meta):
self.assert_status(response, 204)
return response
- def get_object(self, account, container, name, exp_meta={}, **headers):
+ def get_object(self, account, container, name, format='', **headers):
path = '/v1/%s/%s/%s' %(account, container, name)
- response = self.client.get(path, **headers)
+ response = self.client.get(path, {'format':format}, **headers)
response.content = response.content.strip()
self.assert_status(response, [200, 206, 304, 412, 416])
if response.status_code in [200, 206]:
return
def upload_random_data(self, account, container, name, length=1024, meta={}):
- data = str(random.getrandbits(length))
+ char_set = string.ascii_uppercase + string.digits
+ data = ''.join(random.choice(char_set) for x in range(length))
return self.upload_data(account, container, name, data, meta)
def upload_data(self, account, container, name, data, meta={}):
obj['name'] = name
try:
obj['data'] = data
- obj['hash'] = compute_hash(obj['data'])
+ obj['hash'] = compute_md5_hash(obj['data'])
meta.update({'HTTP_X_OBJECT_META_TEST':'test1',
'HTTP_ETAG':obj['hash']})
meta['HTTP_CONTENT_TYPE'], enc = mimetypes.guess_type(name)
except IOError:
return
-class ListContainers(BaseTestCase):
+class AccountHead(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+ self.account = 'test'
+ self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ for item in self.containers:
+ self.create_container(self.account, item)
+
+ def tearDown(self):
+ for c in get_content_splitted(self.list_containers(self.account)):
+ self.delete_container(self.account, c)
+
+ def test_get_account_meta(self):
+ response = self.get_account_meta(self.account)
+ r2 = self.list_containers(self.account)
+ containers = get_content_splitted(r2)
+ self.assertEqual(response['X-Account-Container-Count'], str(len(containers)))
+ size = 0
+ for c in containers:
+ r = self.get_container_meta(self.account, c)
+ size = size + int(r['X-Container-Bytes-Used'])
+ self.assertEqual(response['X-Account-Bytes-Used'], str(size))
+
+ #def test_get_account_401(self):
+ # response = self.get_account_meta('non-existing-account')
+ # print response
+ # self.assertEqual(response.status_code, 401)
+
+class AccountGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].childNodes[0].data, 'pears')
-class AccountMetadata(BaseTestCase):
+ def test_if_modified_since(self):
+ t = datetime.datetime.utcnow()
+ t2 = t - datetime.timedelta(minutes=10)
+
+ #add a new container
+ self.create_container(self.account,
+ 'dummy')
+
+ for f in DATE_FORMATS:
+ past = t2.strftime(f)
+
+ headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
+ r = self.list_containers(self.account, **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 200)
+
+ def test_if_modified_since_invalid_date(self):
+ headers = {'HTTP_IF_MODIFIED_SINCE':''}
+ r = self.list_containers(self.account, **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 200)
+
+ def test_if_not_modified_since(self):
+ now = datetime.datetime.utcnow()
+ since = now + datetime.timedelta(1)
+
+ for f in DATE_FORMATS:
+ headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
+ r = self.list_containers(self.account, **headers)
+
+ #assert not modified
+ self.assertEqual(r.status_code, 304)
+
+ def test_if_unmodified_since(self):
+ now = datetime.datetime.utcnow()
+ since = now + datetime.timedelta(1)
+
+ for f in DATE_FORMATS:
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
+ r = self.list_containers(self.account, **headers)
+
+ #assert success
+ self.assertEqual(r.status_code, 200)
+ self.assertEqual(self.containers, get_content_splitted(r))
+
+ def test_if_unmodified_since_precondition_failed(self):
+ t = datetime.datetime.utcnow()
+ t2 = t - datetime.timedelta(minutes=10)
+
+ #add a new container
+ self.create_container(self.account,
+ 'dummy')
+
+ for f in DATE_FORMATS:
+ past = t2.strftime(f)
+
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
+ r = self.list_containers(self.account, **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 412)
+
+class AccountPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
for c in get_content_splitted(self.list_containers(self.account)):
self.delete_container(self.account, c)
- def test_get_account_meta(self):
- response = self.get_account_meta(self.account)
- r2 = self.list_containers(self.account)
- containers = get_content_splitted(r2)
- self.assertEqual(response['X-Account-Container-Count'], str(len(containers)))
- size = 0
- for c in containers:
- r = self.get_container_meta(self.account, c)
- size = size + int(r['X-Container-Bytes-Used'])
- self.assertEqual(response['X-Account-Bytes-Used'], str(size))
-
- #def test_get_account_401(self):
- # response = self.get_account_meta('non-existing-account')
- # print response
- # self.assertEqual(response.status_code, 401)
-
def test_update_meta(self):
meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
response = self.update_account_meta(self.account, **meta)
# meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
# response = self.update_account_meta('non-existing-account', **meta)
-class ListObjects(BaseTestCase):
+class ContainerHead(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+ self.account = 'test'
+ self.container = 'apples'
+ self.create_container(self.account, self.container)
+
+ def tearDown(self):
+ for o in self.list_objects(self.account, self.container):
+ self.delete_object(self.account, self.container, o)
+ self.delete_container(self.account, self.container)
+
+ def test_get_meta(self):
+ headers = {'HTTP_X_OBJECT_META_TRASH':'true'}
+ t1 = datetime.datetime.utcnow()
+ o = self.upload_random_data(self.account,
+ self.container,
+ 'McIntosh.jpg',
+ meta=headers)
+ if o:
+ r = self.get_container_meta(self.account,
+ self.container)
+ self.assertEqual(r['X-Container-Object-Count'], '1')
+ self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data'])))
+ t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
+ delta = (t2 - t1)
+ threashold = datetime.timedelta(seconds=1)
+ self.assertTrue(delta < threashold)
+ self.assertTrue(r['X-Container-Object-Meta'])
+ self.assertTrue('Trash' in r['X-Container-Object-Meta'])
+
+class ContainerGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
def tearDown(self):
for c in self.container:
- self.delete_container_recursively(c)
-
- def delete_container_recursively(self, c):
- for obj in get_content_splitted(self.list_objects(self.account, c)):
- self.delete_object(self.account, c, obj)
- self.delete_container(self.account, c)
+ for obj in get_content_splitted(self.list_objects(self.account, c)):
+ self.delete_object(self.account, c, obj)
+ self.delete_container(self.account, c)
def test_list_objects(self):
response = self.list_objects(self.account, self.container[0])
self.container[0],
o['name'],
**meta)
-
+
r = self.list_objects(self.account,
self.container[0],
meta='Quality')
self.assertEqual(r.status_code, 200)
obj = get_content_splitted(r)
self.assertEqual(len(obj), 2)
- self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
+ self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
-class ContainerMeta(BaseTestCase):
- def setUp(self):
- BaseTestCase.setUp(self)
- self.account = 'test'
- self.container = 'apples'
- self.create_container(self.account, self.container)
+ def test_if_modified_since(self):
+ t = datetime.datetime.utcnow()
+ t2 = t - datetime.timedelta(minutes=10)
+
+ #add a new container
+ self.upload_random_data(self.account,
+ self.container[0],
+ 'dummy.txt')
- def tearDown(self):
- for o in self.list_objects(self.account, self.container):
- self.delete_object(self.account, self.container, o)
- self.delete_container(self.account, self.container)
-
- def test_get_meta(self):
- headers = {'HTTP_X_OBJECT_META_TRASH':'true'}
- t1 = datetime.datetime.utcnow()
- o = self.upload_random_data(self.account,
- self.container,
- 'McIntosh.jpg',
- meta=headers)
- if o:
- r = self.get_container_meta(self.account,
- self.container)
- self.assertEqual(r['X-Container-Object-Count'], '1')
- self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data'])))
- t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
- delta = (t2 - t1)
- threashold = datetime.timedelta(seconds=1)
- self.assertTrue(delta < threashold)
- self.assertTrue(r['X-Container-Object-Meta'])
- self.assertTrue('Trash' in r['X-Container-Object-Meta'])
+ for f in DATE_FORMATS:
+ past = t2.strftime(f)
+
+ headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
+ r = self.list_objects(self.account,
+ self.container[0], **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 200)
- def test_update_meta(self):
- meta = {'HTTP_X_CONTAINER_META_TEST':'test33',
- 'HTTP_X_CONTAINER_META_TOST':'tost22'}
- response = self.update_container_meta(self.account, self.container, **meta)
- response = self.get_container_meta(self.account, self.container)
- for k,v in meta.items():
- key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
- self.assertTrue(response[key])
- self.assertEqual(response[key], v)
+ def test_if_modified_since_invalid_date(self):
+ headers = {'HTTP_IF_MODIFIED_SINCE':''}
+ r = self.list_objects(self.account,
+ self.container[0], **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 200)
+
+ def test_if_not_modified_since(self):
+ now = datetime.datetime.utcnow()
+ since = now + datetime.timedelta(1)
+
+ for f in DATE_FORMATS:
+ headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
+ r = self.list_objects(self.account,
+ self.container[0], **headers)
+
+ #assert not modified
+ self.assertEqual(r.status_code, 304)
+
+ def test_if_unmodified_since(self):
+ now = datetime.datetime.utcnow()
+ since = now + datetime.timedelta(1)
+
+ for f in DATE_FORMATS:
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
+ r = self.list_objects(self.account,
+ self.container[0], **headers)
+
+ #assert success
+ self.assertEqual(r.status_code, 200)
+ self.assertEqual(get_content_splitted(r),
+ get_content_splitted(self.list_objects(self.account,
+ self.container[0])))
-class CreateContainer(BaseTestCase):
+ def test_if_unmodified_since_precondition_failed(self):
+ t = datetime.datetime.utcnow()
+ t2 = t - datetime.timedelta(minutes=10)
+
+ #add a new container
+ self.create_container(self.account,
+ 'dummy')
+
+ for f in DATE_FORMATS:
+ past = t2.strftime(f)
+
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
+ r = self.list_objects(self.account,
+ self.container[0], **headers)
+
+ #assert get success
+ self.assertEqual(r.status_code, 412)
+
+class ContainerPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
if response.status_code == 201:
self.assertTrue(self.create_container(self.account, self.containers[0]).status_code, 202)
-class DeleteContainer(BaseTestCase):
+class ContainerPost(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+ self.account = 'test'
+ self.container = 'apples'
+ self.create_container(self.account, self.container)
+
+ def tearDown(self):
+ for o in self.list_objects(self.account, self.container):
+ self.delete_object(self.account, self.container, o)
+ self.delete_container(self.account, self.container)
+
+ def test_update_meta(self):
+ meta = {'HTTP_X_CONTAINER_META_TEST':'test33',
+ 'HTTP_X_CONTAINER_META_TOST':'tost22'}
+ response = self.update_container_meta(self.account, self.container, **meta)
+ response = self.get_container_meta(self.account, self.container)
+ for k,v in meta.items():
+ key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
+ self.assertTrue(response[key])
+ self.assertEqual(response[key], v)
+
+class ContainerDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
def test_delete_invalid(self):
self.assertItemNotFound(self.delete_container(self.account, 'c3'))
-class GetObjects(BaseTestCase):
+class ObjectHead(BaseTestCase):
+ pass
+
+class ObjectGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
#assert content
self.assertTrue(self.objects[0]['data'][-500:], r.content)
-
+
def test_get_range_not_satisfiable(self):
#perform get with range
offset = len(self.objects[0]['data']) + 1
self.containers[1],
self.objects[0]['name'],
**headers)
-
#assert precondition failed
self.assertEqual(r.status_code, 412)
def test_if_none_match(self):
- #perform get with If-Match
+ #perform get with If-None-Match
headers = {'HTTP_IF_NONE_MATCH':'123'}
r = self.get_object(self.account,
self.containers[1],
self.assertEqual(r.status_code, 200)
def test_if_none_match(self):
- #perform get with If-Match *
+ #perform get with If-None-Match *
headers = {'HTTP_IF_NONE_MATCH':'*'}
r = self.get_object(self.account,
self.containers[1],
#assert get success
self.assertEqual(r.status_code, 304)
-
+
def test_if_none_match_not_modified(self):
- #perform get with If-Match
+ #perform get with If-None-Match
headers = {'HTTP_IF_NONE_MATCH':'%s' %self.objects[0]['hash']}
r = self.get_object(self.account,
self.containers[1],
#assert not modified
self.assertEqual(r.status_code, 304)
self.assertEqual(r['ETag'], self.objects[0]['hash'])
-
- def test_get_with_if_modified_since(self):
+
+ def test_if_modified_since(self):
t = datetime.datetime.utcnow()
t2 = t - datetime.timedelta(minutes=10)
#assert get success
self.assertEqual(r.status_code, 200)
-
- def test_get_modified_since_invalid_date(self):
+
+ def test_if_modified_since_invalid_date(self):
headers = {'HTTP_IF_MODIFIED_SINCE':''}
r = self.get_object(self.account,
self.containers[1],
#assert get success
self.assertEqual(r.status_code, 200)
- def test_get_not_modified_since(self):
+ def test_if_not_modified_since(self):
now = datetime.datetime.utcnow()
since = now + datetime.timedelta(1)
#assert not modified
self.assertEqual(r.status_code, 304)
- def test_get_with_if_unmodified_since(self):
- return
+ def test_if_unmodified_since(self):
+ now = datetime.datetime.utcnow()
+ since = now + datetime.timedelta(1)
+
+ for f in DATE_FORMATS:
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
+ r = self.get_object(self.account,
+ self.containers[1],
+ self.objects[0]['name'],
+ **headers)
+ #assert success
+ self.assertEqual(r.status_code, 200)
+ self.assertEqual(self.objects[0]['data'], r.content)
-class UploadObject(BaseTestCase):
+ def test_if_unmodified_since_precondition_failed(self):
+ t = datetime.datetime.utcnow()
+ t2 = t - datetime.timedelta(minutes=10)
+
+ #modify the object
+ self.upload_object(self.account,
+ self.containers[1],
+ self.objects[0]['name'],
+ self.objects[0]['data'][:200])
+
+ for f in DATE_FORMATS:
+ past = t2.strftime(f)
+
+ headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
+ r = self.get_object(self.account,
+ self.containers[1],
+ self.objects[0]['name'],
+ **headers)
+ #assert get success
+ self.assertEqual(r.status_code, 412)
+
+ def test_hashes(self):
+ #block_size = 4 * 1024 * 1024
+ block_size = 128 * 1024
+ block_num = 2
+ l = block_size * block_num + 1
+ fname = 'largefile.txt'
+ o = self.upload_random_data(self.account,
+ self.containers[1],
+ fname,
+ l)
+ if o:
+ r = self.get_object(self.account,
+ self.containers[1],
+ fname,
+ 'json')
+ hashes = json.loads(r.content)['hashes']
+ self.assertTrue(len(hashes), block_num + 1)
+ i = 0
+ for h in hashes:
+ start = i * block_size
+ end = (i + 1) * block_size
+ self.assertEqual(h, compute_block_hash(o['data'][start:end]))
+ i += 1
+
+class ObjectPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
# delete test file
os.remove(self.dest)
-
+
def test_upload(self):
filename = 'tests.py'
fullpath = os.path.join('.', 'api', filename)
f = open(fullpath, 'r')
data = f.read()
- hash = compute_hash(data)
+ hash = compute_md5_hash(data)
meta={'HTTP_ETAG':hash,
'HTTP_X_OBJECT_MANIFEST':123,
'HTTP_X_OBJECT_META_TEST':'test1'
content_type = meta['HTTP_CONTENT_TYPE'],
**meta)
self.assertEqual(r.status_code, 422)
-
+
def test_chucked_update(self):
objname = os.path.split(self.src)[-1:][0]
f = open(self.dest, 'r')
actual_data = f.read()
self.assertEqual(actual_data, uploaded_data)
-class CopyObject(BaseTestCase):
+class ObjectCopy(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
**meta)
self.assertItemNotFound(r)
-
#copy from invalid container
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
src_path = os.path.join('/', self.containers[1], self.obj['name'])
**meta)
self.assertItemNotFound(r)
-class MoveObject(CopyObject):
+class ObjectMove(ObjectCopy):
def test_move(self):
#perform move
meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
self.assertItemNotFound(r)
-class UpdateObjectMeta(BaseTestCase):
+class ObjectPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
self.assertTrue(r[key])
self.assertTrue(r[key], v)
-class DeleteObject(BaseTestCase):
+class ObjectDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.account = 'test'
#assert success
self.assertEqual(r.status_code, 204)
-
+
def test_delete_invalid(self):
#perform delete object
r = self.delete_object(self.account, self.containers[1], self.obj['name'])
if response:
return response.content.split('\n')
-def compute_hash(data):
+def compute_md5_hash(data):
md5 = hashlib.md5()
offset = 0
md5.update(data)
return md5.hexdigest().lower()
+def compute_block_hash(data):
+ h = hashlib.new(backend.hash_algorithm)
+ h.update(data.rstrip('\x00'))
+ return h.hexdigest()
+
def create_chunked_update_test_file(src, dest):
fr = open(src, 'r')
fw = open(dest, 'w')