def setUp(self):
self.client = Pithos_Client(get_server(), get_auth(), get_user(),
get_api())
+ self._clean_account()
self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid',
get_api())
- #self.headers = {
- # 'account': ('x-account-container-count',
- # 'x-account-bytes-used',
- # 'last-modified',
- # 'content-length',
- # 'date',
- # 'content_type',
- # 'server',),
- # 'object': ('etag',
- # 'content-length',
- # 'content_type',
- # 'content-encoding',
- # 'last-modified',
- # 'date',
- # 'x-object-manifest',
- # 'content-range',
- # 'x-object-modified-by',
- # 'x-object-version',
- # 'x-object-version-timestamp',
- # 'server',),
- # 'container': ('x-container-object-count',
- # 'x-container-bytes-used',
- # 'content_type',
- # 'last-modified',
- # 'content-length',
- # 'date',
- # 'x-container-block-size',
- # 'x-container-block-hash',
- # 'x-container-policy-quota',
- # 'x-container-policy-versioning',
- # 'server',
- # 'x-container-object-meta',
- # 'x-container-policy-versioning',
- # 'server',)}
- #
- #self.contentTypes = {'xml':'application/xml',
- # 'json':'application/json',
- # '':'text/plain'}
+
+ #keep track of initial account groups
+ self.initial_groups = self.client.retrieve_account_groups()
+
+ #keep track of initial account meta
+ self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
+
self.extended = {
'container':(
'name',
'content_type',
'content_encoding',
'last_modified',)}
- self.return_codes = (400, 401, 404, 503,)
+ self.return_codes = (400, 401, 403, 404, 503,)
def tearDown(self):
+ #delete additionally created meta
+ l = []
+ for m in self.client.retrieve_account_metadata(restricted=True):
+ if m not in self.initial_meta:
+ l.append(m)
+ self.client.delete_account_metadata(l)
+
+ #delete additionally created groups
+ l = []
+ for g in self.client.retrieve_account_groups():
+ if g not in self.initial_groups:
+ l.append(g)
+ self.client.unset_account_groups(l)
+ self._clean_account()
+
+ def _clean_account(self):
for c in self.client.list_containers():
while True:
#list objects returns at most 10000 objects
l.append(codes)
self.assertTrue(status in l)
- #def assert_headers(self, headers, type, **exp_meta):
- # prefix = 'x-%s-meta-' %type
- # system_headers = [h for h in headers if not h.startswith(prefix)]
- # for k,v in headers.items():
- # if k in system_headers:
- # self.assertTrue(k in headers[type])
- # elif exp_meta:
- # k = k.split(prefix)[-1]
- # self.assertEqual(v, exp_meta[k])
-
def assert_extended(self, data, format, type, size=10000):
if format == 'xml':
self._assert_xml(data, type, size)
r = callableObj(*args, **kwargs)
self.fail('Should never reach here')
except Fault, f:
- self.failUnless(f.status == status)
+ if type(status) == types.ListType:
+ self.failUnless(f.status in status)
+ else:
+ self.failUnless(f.status == status)
def assert_not_raises_fault(self, status, callableObj, *args, **kwargs):
"""
self.assert_raises_fault(404, self.client.retrieve_object_metadata,
container, object)
+ def assert_versionlist_structure(self, versionlist):
+ self.assertTrue(type(versionlist) == types.ListType)
+ for elem in versionlist:
+ self.assertTrue(type(elem) == types.ListType)
+ self.assertEqual(len(elem), 2)
+
def upload_random_data(self, container, name, length=1024, type=None,
enc=None, **meta):
data = get_random_data(length)
obj['meta'] = args
path = '/%s/%s' % (container, name)
- self.client.create_object(container, name, StringIO(obj['data']),
- meta, **args)
+ self.client.create_object(container, name, f=StringIO(obj['data']),
+ meta=meta, **args)
return obj
except IOError:
for item in self.containers:
self.client.create_container(item)
- #keep track of initial account groups
- self.initial_groups = self.client.retrieve_account_groups()
-
- #keep track of initial account meta
- self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-
meta = {'foo':'bar'}
self.client.update_account_metadata(**meta)
- self.updated_meta = self.initial_meta.update(meta)
-
- def tearDown(self):
- #delete additionally created meta
- l = []
- for m in self.client.retrieve_account_metadata(restricted=True):
- if m not in self.initial_meta:
- l.append(m)
- self.client.delete_account_metadata(l)
-
- #delete additionally created groups
- l = []
- for g in self.client.retrieve_account_groups():
- if g not in self.initial_groups:
- l.append(g)
- self.client.unset_account_groups(l)
-
- BaseTestCase.tearDown(self)
+ #self.updated_meta = self.initial_meta.update(meta)
def test_get_account_meta(self):
meta = self.client.retrieve_account_metadata()
size = size + int(m['x-container-bytes-used'])
self.assertEqual(meta['x-account-bytes-used'], str(size))
- def test_get_account_401(self):
- self.assert_raises_fault(401,
+ def test_get_account_403(self):
+ self.assert_raises_fault(403,
self.invalid_client.retrieve_account_metadata)
def test_get_account_meta_until(self):
containers = self.client.list_containers()
self.assertEquals(self.containers, containers)
- def test_list_401(self):
- self.assert_raises_fault(401, self.invalid_client.list_containers)
+ def test_list_403(self):
+ self.assert_raises_fault(403, self.invalid_client.list_containers)
def test_list_with_limit(self):
limit = 2
for item in self.containers:
self.client.create_container(item)
- #keep track of initial account groups
- self.initial_groups = self.client.retrieve_account_groups()
-
- #keep track of initial account meta
- self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-
meta = {'foo':'bar'}
self.client.update_account_metadata(**meta)
- self.updated_meta = self.initial_meta.update(meta)
-
- def tearDown(self):
- #delete additionally created meta
- l = []
- for m in self.client.retrieve_account_metadata(restricted=True):
- if m not in self.initial_meta:
- l.append(m)
- self.client.delete_account_metadata(l)
-
- #delete additionally created groups
- l = []
- for g in self.client.retrieve_account_groups():
- if g not in self.initial_groups:
- l.append(g)
- self.client.unset_account_groups(l)
-
- BaseTestCase.tearDown(self)
+ #self.updated_meta = self.initial_meta.update(meta)
def test_update_meta(self):
with AssertMappingInvariant(self.client.retrieve_account_groups):
def test_invalid_account_update_meta(self):
meta = {'test':'test', 'tost':'tost'}
- self.assert_raises_fault(401,
+ self.assert_raises_fault(403,
self.invalid_client.update_account_metadata,
**meta)
self.assert_extended(objects, 'xml', 'object')
node_name = objects.getElementsByTagName('name')[0]
self.assertEqual(node_name.firstChild.data, '/objectname')
-
- #objects = self.client.list_objects('test', prefix='/')
- #self.assertEqual(objects, ['/objectname'])
- #
- #objects = self.client.list_objects('test', path='/')
- #self.assertEqual(objects, ['/objectname'])
- #
- #objects = self.client.list_objects('test', prefix='/', delimiter='n')
- #self.assertEqual(objects, ['/object'])
def test_list_objects_with_limit_marker(self):
objects = self.client.list_objects(self.container[0], limit=2)
self.client.create_container(self.containers[0])
self.assertTrue(not self.client.create_container(self.containers[0]))
+ def test_quota(self):
+ self.client.create_container(self.containers[0])
+
+ policy = {'quota':100}
+ self.client.set_container_policies('c1', **policy)
+
+ meta = self.client.retrieve_container_metadata('c1')
+ self.assertTrue('x-container-policy-quota' in meta)
+ self.assertEqual(meta['x-container-policy-quota'], '100')
+
+ args = ['c1', 'o1']
+ kwargs = {'length':101}
+ self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
+
+ #reset quota
+ policy = {'quota':0}
+ self.client.set_container_policies('c1', **policy)
+
class ContainerPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
- self.upload_random_data(self.containers[1], o_names[0])
def test_delete(self):
status = self.client.delete_container(self.containers[0])[0]
self.assertEqual(status, 204)
def test_delete_non_empty(self):
+ self.upload_random_data(self.containers[1], o_names[0])
self.assert_raises_fault(409, self.client.delete_container,
self.containers[1])
def test_delete_invalid(self):
self.assert_raises_fault(404, self.client.delete_container, 'c3')
-class ObjectHead(BaseTestCase):
- pass
-
class ObjectGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
for n in names:
self.objects.append(self.upload_random_data(self.containers[1], n))
+ def test_versions(self):
+ c = self.containers[1]
+ o = self.objects[0]
+ b = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(b)
+
+ #update meta
+ meta = {'quality':'AAA', 'stock':True}
+ self.client.update_object_metadata(c, o['name'], **meta)
+
+ a = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(a)
+ self.assertEqual(len(b)+1, len(a))
+ self.assertEqual(b, a[:-1])
+
+ #get exact previous version metadata
+ v = a[-2][0]
+ v_meta = self.client.retrieve_object_metadata(c, o['name'],
+ restricted=True,
+ version=v)
+ for k in meta.keys():
+ self.assertTrue(k not in v_meta)
+
+ #update obejct
+ data = get_random_data()
+ self.client.update_object(c, o['name'], StringIO(data))
+
+ aa = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(aa)
+ self.assertEqual(len(a)+1, len(aa))
+ self.assertEqual(a, aa[:-1])
+
+ #get exact previous version
+ v = aa[-3][0]
+ v_data = self.client.retrieve_object_version(c, o['name'], version=v)
+ self.assertEqual(o['data'], v_data)
+ self.assertEqual(self.client.retrieve_object(c, o['name']),
+ '%s%s' %(v_data, data))
+
def test_get(self):
#perform get
o = self.client.retrieve_object(self.containers[1],
self.objects[0]['meta'])
self.assertEqual(o, self.objects[0]['data'])
+ def test_objects_with_trailing_spaces(self):
+ self.client.create_container('test')
+ #create 'a' object
+ self.upload_random_data('test', 'a')
+ #look for 'a ' object
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a ')
+
+ #delete 'a' object
+ self.client.delete_object('test', 'a')
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a')
+
+ #create 'a ' object
+ self.upload_random_data('test', 'a ')
+ #look for 'a' object
+ self.assert_raises_fault(404, self.client.retrieve_object,
+ 'test', 'a')
+
def test_get_invalid(self):
self.assert_raises_fault(404, self.client.retrieve_object,
self.containers[0], self.objects[0]['name'])
self.assertEqual(int(zero_meta['content-length']), 0)
self.assertEqual(zero_hash, [])
self.assertEqual(zero_data, '')
+
+ def test_create_object_by_hashmap(self):
+ c = self.container
+ o = 'object'
+ self.upload_random_data(c, o)
+ hashmap = self.client.retrieve_object(c, o, format='json')
+ o2 = 'object-copy'
+ self.client.create_object_by_hashmap(c, o2, hashmap)
+ self.assertEqual(self.client.retrieve_object(c, o),
+ self.client.retrieve_object(c, o))
class ObjectCopy(BaseTestCase):
def setUp(self):
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
-
- def tearDown(self):
- pass
-
+
def test_copy(self):
with AssertMappingInvariant(self.client.retrieve_object_metadata,
self.containers[0], self.obj['name']):
self.assert_raises_fault(400, self.test_update_object,
content_length = 1000)
- def test_update_object_invalid_range(self):
+ def _test_update_object_invalid_range(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
- def test_update_object_invalid_range_and_length(self):
+ def _test_update_object_invalid_range_and_length(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
- self.assert_raises_fault(416, self.test_update_object, 499, 0, True,
+ self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
-1)
def test_update_object_invalid_range_with_no_content_length(self):
class ListSharing(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
+ for i in range(2):
+ self.client.create_container('c%s' %i)
self.client.create_container('c')
for i in range(2):
- self.upload_random_data('c', 'o%s' %i)
+ self.upload_random_data('c1', 'o%s' %i)
accounts = OTHER_ACCOUNTS.copy()
self.o1_sharing_with = accounts.popitem()
self.o1_sharing = [self.o1_sharing_with[1]]
- self.client.share_object('c', 'o1', self.o1_sharing, read=True)
+ self.client.share_object('c1', 'o1', self.o1_sharing, read=True)
l = []
for i in range(2):
l.append(accounts.popitem())
- #self.client.set_account_groups({'pithos-dev':'chazapis,verigak,papagian'})
- #self.o2_sharing = 'write=%s' %
- #self.client.share_object('c', 'o2', self.o2_sharing)
- def test_listing(self):
+ def test_list_other_shared(self):
self.other = Pithos_Client(get_server(),
self.o1_sharing_with[0],
self.o1_sharing_with[1],
get_api())
self.assertTrue(get_user() in self.other.list_shared_by_others())
-
-class TestGreek(BaseTestCase):
- def setUp(self):
- BaseTestCase.setUp(self)
- #keep track of initial account groups
- self.initial_groups = self.client.retrieve_account_groups()
-
- #keep track of initial account meta
- self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
- def tearDown(self):
- #delete additionally created meta
- l = []
- for m in self.client.retrieve_account_metadata(restricted=True):
- if m not in self.initial_meta:
- l.append(m)
- self.client.delete_account_metadata(l)
+ def test_list_my_shared(self):
+ my_shared_containers = self.client.list_containers(shared=True)
+ self.assertTrue('c1' in my_shared_containers)
+ self.assertTrue('c2' not in my_shared_containers)
- #delete additionally created groups
- l = []
- for g in self.client.retrieve_account_groups():
- if g not in self.initial_groups:
- l.append(g)
- self.client.unset_account_groups(l)
-
- BaseTestCase.tearDown(self)
+ my_shared_objects = self.client.list_objects('c1', shared=True)
+ self.assertTrue('o1' in my_shared_objects)
+ self.assertTrue('o2' not in my_shared_objects)
+class TestGreek(BaseTestCase):
def test_create_container(self):
self.client.create_container('φάκελος')
self.assert_container_exists('φάκελος')
'0009',
'διογένης',
get_api())
- self.assert_not_raises_fault(401, chef.retrieve_object_metadata,
+ self.assert_not_raises_fault(403, chef.retrieve_object_metadata,
'φάκελος', 'ο1', account=get_user())
#check write access
self.client.share_object('φάκελος', 'ο1', ['διογένης'], read=False)
new_data = get_random_data()
- self.assert_not_raises_fault(401, chef.update_object,
+ self.assert_not_raises_fault(403, chef.update_object,
'φάκελος', 'ο1', StringIO(new_data),
account=get_user())
class TestPermissions(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- #keep track of initial account groups
- self.initial_groups = self.client.retrieve_account_groups()
- #keep track of initial account meta
- self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
#create a group
- self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+ self.authorized = ['chazapis', 'verigak', 'gtsouk']
groups = {'pithosdev':','.join(self.authorized)}
self.client.set_account_groups(**groups)
- def tearDown(self):
- #delete additionally created meta
- l = []
- for m in self.client.retrieve_account_metadata(restricted=True):
- if m not in self.initial_meta:
- l.append(m)
- self.client.delete_account_metadata(l)
-
- #delete additionally created groups
- l = []
- for g in self.client.retrieve_account_groups():
- if g not in self.initial_groups:
- l.append(g)
- self.client.unset_account_groups(l)
-
- BaseTestCase.tearDown(self)
-
- def test_read_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+ def assert_read(self, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
- if account in self.authorized:
- self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+ if account in authorized or any:
+ self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
else:
- self.assert_raises_fault(401, cl.retrieve_object_metadata,
+ self.assert_raises_fault(403, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
+ #check inheritance
+ o = self.upload_random_data('c', 'o/also-shared')
+ for token, account in OTHER_ACCOUNTS.items():
+ cl = Pithos_Client(get_server(), token, account, get_api())
+ if account in authorized or any:
+ self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
+ 'c', 'o/also-shared', account=get_user())
+ else:
+ self.assert_raises_fault(403, cl.retrieve_object_metadata,
+ 'c', 'o/also-shared', account=get_user())
- def test_write_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['chazapis'], read=False)
- o_data = o['data']
+ def assert_write(self, o_data, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
new_data = get_random_data()
- if account in [get_user(), 'chazapis']:
- self.assert_not_raises_fault(401, cl.update_object,
+ if account in authorized or any:
+ # test write access
+ self.assert_not_raises_fault(403, cl.update_object,
'c', 'o', StringIO(new_data),
account=get_user())
- server_data = self.client.retrieve_object('c', 'o')
- self.assertEqual(o_data, server_data[:len(o_data)])
- self.assertEqual(new_data, server_data[len(o_data):])
- o_data = server_data
+ try:
+ # test read access
+ server_data = cl.retrieve_object('c', 'o', account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 403)
else:
- self.assert_raises_fault(401, cl.update_object,
+ self.assert_raises_fault(403, cl.update_object,
'c', 'o', StringIO(new_data),
account=get_user())
+
+ #check inheritance
+ o = self.upload_random_data('c', 'o/also-shared')
+ o_data = o['data']
+ for token, account in OTHER_ACCOUNTS.items():
+ cl = Pithos_Client(get_server(), token, account, get_api())
+ new_data = get_random_data()
+ if account in authorized or any:
+ # test write access
+ self.assert_not_raises_fault(403, cl.update_object,
+ 'c', o['name'],
+ StringIO(new_data),
+ account=get_user())
+ try:
+ server_data = cl.retrieve_object('c', o['name'], account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 403)
+ else:
+ self.assert_raises_fault(403, cl.update_object,
+ 'c', o['name'],
+ StringIO(new_data),
+ account=get_user())
+
+ def test_group_read(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_many(self):
+ #test read access
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized)
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'])
+ self.assert_read(any=True)
+
+ def test_group_write(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_many(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized, read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'], read=False)
+ o_data = o['data']
+ self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+ def test_publish(self):
+ self.client.create_container('c')
+ o_data = self.upload_random_data('c', 'o')['data']
+ self.client.publish_object('c', 'o')
+ meta = self.client.retrieve_object_metadata('c', 'o')
+ self.assertTrue('x-object-public' in meta)
+ url = '/public/%s/c/o' % get_user()
+ self.assertEqual(meta['x-object-public'], url)
+ public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+ data = public_client.get(url)[2]
+ self.assertEqual(o_data, data)
class AssertMappingInvariant(object):
def __init__(self, callable, *args, **kwargs):
'photos/me.jpg']
if __name__ == "__main__":
- unittest.main()
+ if get_user() == 'test':
+ unittest.main()
+ else:
+ print 'Will not run tests as any other user except \'test\' (current user: %s).' % get_user()