X-Git-Url: https://code.grnet.gr/git/pithos/blobdiff_plain/2a1f76db6f7669c84f13c5804deee6196a3a7bc3..9a43d4b982d2e2e6a96d919571e454648111a5e2:/tools/test?ds=sidebyside diff --git a/tools/test b/tools/test index 97f854f..6d611cb 100755 --- a/tools/test +++ b/tools/test @@ -69,6 +69,7 @@ class BaseTestCase(unittest.TestCase): def setUp(self): self.client = Pithos_Client(get_server(), get_auth(), get_user(), get_api()) + self._clean_account() self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid', get_api()) #self.headers = { @@ -127,6 +128,9 @@ class BaseTestCase(unittest.TestCase): self.return_codes = (400, 401, 404, 503,) def tearDown(self): + self._clean_account() + + def _clean_account(self): for c in self.client.list_containers(): while True: #list objects returns at most 10000 objects @@ -195,7 +199,10 @@ class BaseTestCase(unittest.TestCase): r = callableObj(*args, **kwargs) self.fail('Should never reach here') except Fault, f: - self.failUnless(f.status == status) + if type(status) == types.ListType: + self.failUnless(f.status in status) + else: + self.failUnless(f.status == status) def assert_not_raises_fault(self, status, callableObj, *args, **kwargs): """ @@ -239,6 +246,12 @@ class BaseTestCase(unittest.TestCase): self.assert_raises_fault(404, self.client.retrieve_object_metadata, container, object) + def assert_versionlist_structure(self, versionlist): + self.assertTrue(type(versionlist) == types.ListType) + for elem in versionlist: + self.assertTrue(type(elem) == types.ListType) + self.assertEqual(len(elem), 2) + def upload_random_data(self, container, name, length=1024, type=None, enc=None, **meta): data = get_random_data(length) @@ -267,8 +280,8 @@ class BaseTestCase(unittest.TestCase): obj['meta'] = args path = '/%s/%s' % (container, name) - self.client.create_object(container, name, StringIO(obj['data']), - meta, **args) + self.client.create_object(container, name, f=StringIO(obj['data']), + meta=meta, **args) return obj except IOError: @@ -277,7 +290,6 @@ class BaseTestCase(unittest.TestCase): class AccountHead(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] for item in self.containers: self.client.create_container(item) @@ -349,7 +361,6 @@ class AccountHead(BaseTestCase): class AccountGet(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' #create some containers self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] for item in self.containers: @@ -455,7 +466,6 @@ class AccountGet(BaseTestCase): class AccountPost(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears'] for item in self.containers: self.client.create_container(item) @@ -529,14 +539,15 @@ class AccountPost(BaseTestCase): groups = {'pithosdev':'verigak,gtsouk,chazapis'} self.client.set_account_groups(**groups) - self.assertEqual(groups, self.client.retrieve_account_groups()) + self.assertEqual(set(groups['pithosdev']), + set(self.client.retrieve_account_groups()['pithosdev'])) more_groups = {'clientsdev':'pkanavos,mvasilak'} self.client.set_account_groups(**more_groups) groups.update(more_groups) - self.assertEqual(set(groups['pithosdev']), - set(self.client.retrieve_account_groups()['pithosdev'])) + self.assertEqual(set(groups['clientsdev']), + set(self.client.retrieve_account_groups()['clientsdev'])) def test_reset_account_groups(self): with AssertMappingInvariant(self.client.retrieve_account_metadata): @@ -544,13 +555,16 @@ class AccountPost(BaseTestCase): 'clientsdev':'pkanavos,mvasilak'} self.client.set_account_groups(**groups) - self.assertEqual(groups, self.client.retrieve_account_groups()) + self.assertEqual(set(groups['pithosdev'].split(',')), + set(self.client.retrieve_account_groups()['pithosdev'].split(','))) + self.assertEqual(set(groups['clientsdev'].split(',')), + set(self.client.retrieve_account_groups()['clientsdev'].split(','))) groups = {'pithosdev':'verigak,gtsouk,chazapis,papagian'} self.client.reset_account_groups(**groups) - self.assertEqual(groups['pithosdev'].split(','), - self.client.retrieve_account_groups()['pithosdev'].split(',')) + self.assertEqual(set(groups['pithosdev'].split(',')), + set(self.client.retrieve_account_groups()['pithosdev'].split(','))) def test_delete_account_groups(self): with AssertMappingInvariant(self.client.retrieve_account_metadata): @@ -565,7 +579,6 @@ class AccountPost(BaseTestCase): class ContainerHead(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.container = 'apples' self.client.create_container(self.container) @@ -587,7 +600,6 @@ class ContainerHead(BaseTestCase): class ContainerGet(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.container = ['pears', 'apples'] for c in self.container: self.client.create_container(c) @@ -797,7 +809,6 @@ class ContainerGet(BaseTestCase): class ContainerPut(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] def test_create(self): @@ -810,10 +821,27 @@ class ContainerPut(BaseTestCase): self.client.create_container(self.containers[0]) self.assertTrue(not self.client.create_container(self.containers[0])) + def test_quota(self): + self.client.create_container(self.containers[0]) + + policy = {'quota':100} + self.client.set_container_policies('c1', **policy) + + meta = self.client.retrieve_container_metadata('c1') + self.assertTrue('x-container-policy-quota' in meta) + self.assertEqual(meta['x-container-policy-quota'], '100') + + args = ['c1', 'o1'] + kwargs = {'length':101} + self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs) + + #reset quota + policy = {'quota':0} + self.client.set_container_policies('c1', **policy) + class ContainerPost(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.container = 'apples' self.client.create_container(self.container) @@ -830,17 +858,16 @@ class ContainerPost(BaseTestCase): class ContainerDelete(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] for c in self.containers: self.client.create_container(c) - self.upload_random_data(self.containers[1], o_names[0]) def test_delete(self): status = self.client.delete_container(self.containers[0])[0] self.assertEqual(status, 204) def test_delete_non_empty(self): + self.upload_random_data(self.containers[1], o_names[0]) self.assert_raises_fault(409, self.client.delete_container, self.containers[1]) @@ -853,7 +880,6 @@ class ObjectHead(BaseTestCase): class ObjectGet(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] #create some containers for c in self.containers: @@ -865,6 +891,45 @@ class ObjectGet(BaseTestCase): for n in names: self.objects.append(self.upload_random_data(self.containers[1], n)) + def test_versions(self): + c = self.containers[1] + o = self.objects[0] + b = self.client.retrieve_object_versionlist(c, o['name'])['versions'] + self.assert_versionlist_structure(b) + + #update meta + meta = {'quality':'AAA', 'stock':True} + self.client.update_object_metadata(c, o['name'], **meta) + + a = self.client.retrieve_object_versionlist(c, o['name'])['versions'] + self.assert_versionlist_structure(a) + self.assertEqual(len(b)+1, len(a)) + self.assertEqual(b, a[:-1]) + + #get exact previous version metadata + v = a[-2][0] + v_meta = self.client.retrieve_object_metadata(c, o['name'], + restricted=True, + version=v) + for k in meta.keys(): + self.assertTrue(k not in v_meta) + + #update obejct + data = get_random_data() + self.client.update_object(c, o['name'], StringIO(data)) + + aa = self.client.retrieve_object_versionlist(c, o['name'])['versions'] + self.assert_versionlist_structure(aa) + self.assertEqual(len(a)+1, len(aa)) + self.assertEqual(a, aa[:-1]) + + #get exact previous version + v = aa[-3][0] + v_data = self.client.retrieve_object_version(c, o['name'], version=v) + self.assertEqual(o['data'], v_data) + self.assertEqual(self.client.retrieve_object(c, o['name']), + '%s%s' %(v_data, data)) + def test_get(self): #perform get o = self.client.retrieve_object(self.containers[1], @@ -1171,9 +1236,8 @@ class ObjectGet(BaseTestCase): fname = 'largefile' o = self.upload_random_data(self.containers[1], fname, l) if o: - data = self.client.retrieve_object(self.containers[1], fname, + body = self.client.retrieve_object(self.containers[1], fname, format='json') - body = json.loads(data) hashes = body['hashes'] block_size = body['block_size'] block_hash = body['block_hash'] @@ -1190,7 +1254,6 @@ class ObjectGet(BaseTestCase): class ObjectPut(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.container = 'c1' self.client.create_container(self.container) @@ -1274,18 +1337,36 @@ class ObjectPut(BaseTestCase): self.assertEqual('', self.client.retrieve_object(self.container, 'large-object')) + def test_create_zero_length_object(self): + c = self.container + o = 'object' + zero = self.client.create_zero_length_object(c, o) + zero_meta = self.client.retrieve_object_metadata(c, o) + zero_hash = self.client.retrieve_object_hashmap(c, o) + zero_data = self.client.retrieve_object(c, o) + + self.assertEqual(int(zero_meta['content-length']), 0) + self.assertEqual(zero_hash, []) + self.assertEqual(zero_data, '') + + def test_create_object_by_hashmap(self): + c = self.container + o = 'object' + self.upload_random_data(c, o) + hashmap = self.client.retrieve_object(c, o, format='json') + o2 = 'object-copy' + self.client.create_object_by_hashmap(c, o2, hashmap) + self.assertEqual(self.client.retrieve_object(c, o), + self.client.retrieve_object(c, o)) + class ObjectCopy(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] for c in self.containers: self.client.create_container(c) self.obj = self.upload_random_data(self.containers[0], o_names[0]) - - def tearDown(self): - pass - + def test_copy(self): with AssertMappingInvariant(self.client.retrieve_object_metadata, self.containers[0], self.obj['name']): @@ -1348,7 +1429,6 @@ class ObjectCopy(BaseTestCase): class ObjectMove(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] for c in self.containers: self.client.create_container(c) @@ -1378,24 +1458,25 @@ class ObjectMove(BaseTestCase): class ObjectPost(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] for c in self.containers: self.client.create_container(c) - self.obj = self.upload_random_data(self.containers[0], o_names[0]) + self.obj = [] + for i in range(2): + self.obj.append(self.upload_random_data(self.containers[0], o_names[i])) def test_update_meta(self): #perform update metadata more = {'foo':'foo', 'bar':'bar'} status = self.client.update_object_metadata(self.containers[0], - self.obj['name'], + self.obj[0]['name'], **more)[0] #assert request accepted self.assertEqual(status, 202) #assert old metadata are still there headers = self.client.retrieve_object_metadata(self.containers[0], - self.obj['name'], + self.obj[0]['name'], restricted=True) #assert new metadata have been updated for k,v in more.items(): @@ -1407,19 +1488,19 @@ class ObjectPost(BaseTestCase): last_byte_pos=499, instance_length = True, content_length = 500): - l = len(self.obj['data']) - length = l if instance_length else '*' + l = len(self.obj[0]['data']) range = 'bytes %d-%d/%s' %(first_byte_pos, last_byte_pos, - length) + l if instance_length else '*') partial = last_byte_pos - first_byte_pos + 1 + length = first_byte_pos + partial data = get_random_data(partial) args = {'content_type':'application/octet-stream', 'content_range':'%s' %range} if content_length: args['content_length'] = content_length - status = self.client.update_object(self.containers[0], self.obj['name'], + status = self.client.update_object(self.containers[0], self.obj[0]['name'], StringIO(data), **args)[0] if partial < 0 or (instance_length and l <= last_byte_pos): @@ -1428,87 +1509,194 @@ class ObjectPost(BaseTestCase): self.assertEqual(status, 204) #check modified object content = self.client.retrieve_object(self.containers[0], - self.obj['name']) - self.assertEqual(content[0:partial], data) - self.assertEqual(content[partial:l], self.obj['data'][partial:l]) + self.obj[0]['name']) + self.assertEqual(content[:first_byte_pos], self.obj[0]['data'][:first_byte_pos]) + self.assertEqual(content[first_byte_pos:last_byte_pos+1], data) + self.assertEqual(content[last_byte_pos+1:], self.obj[0]['data'][last_byte_pos+1:]) + + def test_update_object_lt_blocksize(self): + self.test_update_object(10, 20, content_length=None) + + def test_update_object_gt_blocksize(self): + o = self.upload_random_data(self.containers[0], o_names[1], + length=4*1024*1024+5) + c = self.containers[0] + o_name = o['name'] + o_data = o['data'] + first_byte_pos = 4*1024*1024+1 + last_byte_pos = 4*1024*1024+4 + l = last_byte_pos - first_byte_pos + 1 + data = get_random_data(l) + range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos) + self.client.update_object(c, o_name, StringIO(data), content_range=range) + content = self.client.retrieve_object(c, o_name) + self.assertEqual(content[:first_byte_pos], o_data[:first_byte_pos]) + self.assertEqual(content[first_byte_pos:last_byte_pos+1], data) + self.assertEqual(content[last_byte_pos+1:], o_data[last_byte_pos+1:]) + + def test_update_object_divided_by_blocksize(self): + o = self.upload_random_data(self.containers[0], o_names[1], + length=4*1024*1024+5) + c = self.containers[0] + o_name = o['name'] + o_data = o['data'] + first_byte_pos = 4*1024*1024 + last_byte_pos = 5*1024*1024 + l = last_byte_pos - first_byte_pos + 1 + data = get_random_data(l) + range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos) + self.client.update_object(c, o_name, StringIO(data), content_range=range) + content = self.client.retrieve_object(c, o_name) + self.assertEqual(content[:first_byte_pos], o_data[:first_byte_pos]) + self.assertEqual(content[first_byte_pos:last_byte_pos+1], data) + self.assertEqual(content[last_byte_pos+1:], o_data[last_byte_pos+1:]) def test_update_object_no_content_length(self): self.test_update_object(content_length = None) def test_update_object_invalid_content_length(self): with AssertContentInvariant(self.client.retrieve_object, - self.containers[0], self.obj['name']): + self.containers[0], self.obj[0]['name']): self.assert_raises_fault(400, self.test_update_object, content_length = 1000) def test_update_object_invalid_range(self): with AssertContentInvariant(self.client.retrieve_object, - self.containers[0], self.obj['name']): + self.containers[0], self.obj[0]['name']): self.assert_raises_fault(416, self.test_update_object, 499, 0, True) def test_update_object_invalid_range_and_length(self): with AssertContentInvariant(self.client.retrieve_object, - self.containers[0], self.obj['name']): - self.assert_raises_fault(416, self.test_update_object, 499, 0, True, + self.containers[0], self.obj[0]['name']): + self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True, -1) def test_update_object_invalid_range_with_no_content_length(self): with AssertContentInvariant(self.client.retrieve_object, - self.containers[0], self.obj['name']): + self.containers[0], self.obj[0]['name']): self.assert_raises_fault(416, self.test_update_object, 499, 0, True, content_length = None) def test_update_object_out_of_limits(self): with AssertContentInvariant(self.client.retrieve_object, - self.containers[0], self.obj['name']): - l = len(self.obj['data']) + self.containers[0], self.obj[0]['name']): + l = len(self.obj[0]['data']) self.assert_raises_fault(416, self.test_update_object, 0, l+1, True) def test_append(self): data = get_random_data(500) headers = {} - self.client.update_object(self.containers[0], self.obj['name'], + self.client.update_object(self.containers[0], self.obj[0]['name'], StringIO(data), content_length=500, content_type='application/octet-stream') content = self.client.retrieve_object(self.containers[0], - self.obj['name']) - self.assertEqual(len(content), len(self.obj['data']) + 500) - self.assertEqual(content[:-500], self.obj['data']) + self.obj[0]['name']) + self.assertEqual(len(content), len(self.obj[0]['data']) + 500) + self.assertEqual(content[:-500], self.obj[0]['data']) def test_update_with_chunked_transfer(self): data = get_random_data(500) dl = len(data) - fl = len(self.obj['data']) + fl = len(self.obj[0]['data']) self.client.update_object_using_chunks(self.containers[0], - self.obj['name'], StringIO(data), + self.obj[0]['name'], + StringIO(data), offset=0, content_type='application/octet-stream') #check modified object content = self.client.retrieve_object(self.containers[0], - self.obj['name']) + self.obj[0]['name']) self.assertEqual(content[0:dl], data) - self.assertEqual(content[dl:fl], self.obj['data'][dl:fl]) + self.assertEqual(content[dl:fl], self.obj[0]['data'][dl:fl]) def test_update_from_other_object(self): - data = self.upload_random_data(self.containers[0], o_names[1])['data'] - source_object = '/%s/%s' % (self.containers[0], o_names[0]) - self.client.update_from_other_source(self.containers[0], o_names[1], - source_object) + c = self.containers[0] + src = o_names[0] + dest = 'object' + + source_data = self.client.retrieve_object(c, src) + source_meta = self.client.retrieve_object_metadata(c, src) + source_hash = self.client.retrieve_object_hashmap(c, src) + + #update zero length object + self.client.create_zero_length_object(c, dest) + source_object = '/%s/%s' % (c, src) + self.client.update_from_other_source(c, dest, source_object) + dest_data = self.client.retrieve_object(c, src) + dest_meta = self.client.retrieve_object_metadata(c, dest) + dest_hash = self.client.retrieve_object_hashmap(c, src) + self.assertEqual(source_data, dest_data) + self.assertEqual(source_hash, dest_hash) + + #test append + self.client.update_from_other_source(c, dest, source_object) + content = self.client.retrieve_object(c, dest) + self.assertEqual(source_data * 2, content) + + def test_update_range_from_other_object(self): + c = self.containers[0] + dest = 'object' + + #test update range + src = self.obj[1]['name'] + src_data = self.client.retrieve_object(c, src) + + #update zero length object + prev_data = self.upload_random_data(c, dest, length=4*1024*1024+10)['data'] + source_object = '/%s/%s' % (c, src) + first_byte_pos = 4*1024*1024+1 + last_byte_pos = 4*1024*1024+4 + range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos) + self.client.update_from_other_source(c, dest, source_object, + content_range=range) + content = self.client.retrieve_object(c, dest) + self.assertEqual(content[:first_byte_pos], prev_data[:first_byte_pos]) + self.assertEqual(content[first_byte_pos:last_byte_pos+1], src_data[:last_byte_pos - first_byte_pos + 1]) + self.assertEqual(content[last_byte_pos+1:], prev_data[last_byte_pos+1:]) + + def test_update_hashes_from_other_object(self): + c = self.containers[0] + dest = 'object' + + #test update range + src_data = self.upload_random_data(c, o_names[0], length=1024*1024+10)['data'] + + #update zero length object + prev_data = self.upload_random_data(c, dest, length=5*1024*1024+10)['data'] + source_object = '/%s/%s' % (c, o_names[0]) + first_byte_pos = 4*1024*1024 + last_byte_pos = 5*1024*1024 + range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos) + self.client.update_from_other_source(c, dest, source_object, + content_range=range) + content = self.client.retrieve_object(c, dest) + self.assertEqual(content[:first_byte_pos], prev_data[:first_byte_pos]) + self.assertEqual(content[first_byte_pos:last_byte_pos+1], src_data[:last_byte_pos - first_byte_pos + 1]) + self.assertEqual(content[last_byte_pos+1:], prev_data[last_byte_pos+1:]) + + + def test_update_zero_length_object(self): + c = self.containers[0] + o = 'object' + other = 'other' + zero = self.client.create_zero_length_object(c, o) - self.assertEqual( - self.client.retrieve_object(self.containers[0], o_names[1]), - '%s%s' % (data, - self.client.retrieve_object(self.containers[0], o_names[0]))) - + data = get_random_data() + self.client.update_object(c, o, StringIO(data)) + self.client.create_object(c, other, StringIO(data)) + + self.assertEqual(self.client.retrieve_object(c, o), + self.client.retrieve_object(c, other)) + + self.assertEqual(self.client.retrieve_object_hashmap(c, o), + self.client.retrieve_object_hashmap(c, other)) - class ObjectDelete(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) - self.account = 'test' self.containers = ['c1', 'c2'] for c in self.containers: self.client.create_container(c) @@ -1526,28 +1714,36 @@ class ObjectDelete(BaseTestCase): class ListSharing(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) + for i in range(2): + self.client.create_container('c%s' %i) self.client.create_container('c') for i in range(2): - self.upload_random_data('c', 'o%s' %i) + self.upload_random_data('c1', 'o%s' %i) accounts = OTHER_ACCOUNTS.copy() self.o1_sharing_with = accounts.popitem() self.o1_sharing = [self.o1_sharing_with[1]] - self.client.share_object('c', 'o1', self.o1_sharing, read=True) + self.client.share_object('c1', 'o1', self.o1_sharing, read=True) l = [] for i in range(2): l.append(accounts.popitem()) - #self.client.set_account_groups({'pithos-dev':'chazapis,verigak,papagian'}) - #self.o2_sharing = 'write=%s' % - #self.client.share_object('c', 'o2', self.o2_sharing) - def test_listing(self): + def test_list_other_shared(self): self.other = Pithos_Client(get_server(), self.o1_sharing_with[0], self.o1_sharing_with[1], get_api()) - self.assertTrue('test' in self.other.list_shared_by_others()) - + self.assertTrue(get_user() in self.other.list_shared_by_others()) + + def test_list_my_shared(self): + my_shared_containers = self.client.list_containers(shared=True) + self.assertTrue('c1' in my_shared_containers) + self.assertTrue('c2' not in my_shared_containers) + + my_shared_objects = self.client.list_objects('c1', shared=True) + self.assertTrue('o1' in my_shared_objects) + self.assertTrue('o2' not in my_shared_objects) + class TestGreek(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) @@ -1688,7 +1884,7 @@ class TestGreek(BaseTestCase): #check read access self.client.create_container('φάκελος') o = self.upload_random_data('φάκελος', 'ο1') - self.client.share_object('φάκελος', 'ο1', ['test:σεφς']) + self.client.share_object('φάκελος', 'ο1', ['%s:σεφς' % get_user()]) chef = Pithos_Client(get_server(), '0009', 'διογένης', @@ -1748,7 +1944,7 @@ class TestPermissions(BaseTestCase): self.initial_meta = self.client.retrieve_account_metadata(restricted=True) #create a group - self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian'] + self.authorized = ['chazapis', 'verigak', 'gtsouk'] groups = {'pithosdev':','.join(self.authorized)} self.client.set_account_groups(**groups) @@ -1769,38 +1965,124 @@ class TestPermissions(BaseTestCase): BaseTestCase.tearDown(self) - def test_read_access(self): - self.client.create_container('c') - o = self.upload_random_data('c', 'o') - self.client.share_object('c', 'o', ['test:pithosdev']) + def assert_read(self, authorized=[], any=False): for token, account in OTHER_ACCOUNTS.items(): cl = Pithos_Client(get_server(), token, account, get_api()) - if account in self.authorized: + if account in authorized or any: self.assert_not_raises_fault(401, cl.retrieve_object_metadata, 'c', 'o', account=get_user()) else: self.assert_raises_fault(401, cl.retrieve_object_metadata, 'c', 'o', account=get_user()) + #check inheritance + o = self.upload_random_data('c', 'o/also-shared') + for token, account in OTHER_ACCOUNTS.items(): + cl = Pithos_Client(get_server(), token, account, get_api()) + if account in authorized or any: + self.assert_not_raises_fault(401, cl.retrieve_object_metadata, + 'c', 'o/also-shared', account=get_user()) + else: + self.assert_raises_fault(401, cl.retrieve_object_metadata, + 'c', 'o/also-shared', account=get_user()) - def test_write_access(self): - self.client.create_container('c') - o = self.upload_random_data('c', 'o') - self.client.share_object('c', 'o', ['chazapis'], read=False) + def assert_write(self, o_data, authorized=[], any=False): for token, account in OTHER_ACCOUNTS.items(): cl = Pithos_Client(get_server(), token, account, get_api()) new_data = get_random_data() - if account == 'chazapis': + if account in authorized or any: + # test write access self.assert_not_raises_fault(401, cl.update_object, 'c', 'o', StringIO(new_data), account=get_user()) - server_data = self.client.retrieve_object('c', 'o') - self.assertEqual(o['data'], server_data[:len(o['data'])]) - self.assertEqual(new_data, server_data[len(o['data']):]) + try: + # test read access + server_data = cl.retrieve_object('c', 'o', account=get_user()) + self.assertEqual(o_data, server_data[:len(o_data)]) + self.assertEqual(new_data, server_data[len(o_data):]) + o_data = server_data + except Fault, f: + self.failIf(f.status == 401) else: self.assert_raises_fault(401, cl.update_object, 'c', 'o', StringIO(new_data), account=get_user()) + + #check inheritance + o = self.upload_random_data('c', 'o/also-shared') + o_data = o['data'] + for token, account in OTHER_ACCOUNTS.items(): + cl = Pithos_Client(get_server(), token, account, get_api()) + new_data = get_random_data() + if account in authorized or any: + # test write access + self.assert_not_raises_fault(401, cl.update_object, + 'c', o['name'], + StringIO(new_data), + account=get_user()) + try: + server_data = cl.retrieve_object('c', o['name'], account=get_user()) + self.assertEqual(o_data, server_data[:len(o_data)]) + self.assertEqual(new_data, server_data[len(o_data):]) + o_data = server_data + except Fault, f: + self.failIf(f.status == 401) + else: + self.assert_raises_fault(401, cl.update_object, + 'c', o['name'], + StringIO(new_data), + account=get_user()) + + def test_group_read(self): + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()]) + self.assert_read(authorized=self.authorized) + + def test_read_many(self): + #test read access + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', self.authorized) + self.assert_read(authorized=self.authorized) + + def test_read_by_everyone(self): + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', ['*']) + self.assert_read(any=True) + + def test_group_write(self): + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False) + self.assert_write(o['data'], authorized=self.authorized) + + def test_write_many(self): + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', self.authorized, read=False) + self.assert_write(o['data'], authorized=self.authorized) + + def test_write_by_everyone(self): + self.client.create_container('c') + o = self.upload_random_data('c', 'o') + self.client.share_object('c', 'o', ['*'], read=False) + o_data = o['data'] + self.assert_write(o['data'], any=True) + +class TestPublish(BaseTestCase): + def test_publish(self): + self.client.create_container('c') + o_data = self.upload_random_data('c', 'o')['data'] + self.client.publish_object('c', 'o') + meta = self.client.retrieve_object_metadata('c', 'o') + self.assertTrue('x-object-public' in meta) + url = '/public/%s/c/o' % get_user() + self.assertEqual(meta['x-object-public'], url) + public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='') + data = public_client.get(url)[2] + self.assertEqual(o_data, data) class AssertMappingInvariant(object): def __init__(self, callable, *args, **kwargs): @@ -1886,4 +2168,7 @@ o_names = ['kate.jpg', 'photos/me.jpg'] if __name__ == "__main__": - unittest.main() + if get_user() == 'test': + unittest.main() + else: + print 'Will not run tests as any other user except \'test\' (current user: %s).' % get_user()