r = callableObj(*args, **kwargs)
self.fail('Should never reach here')
except Fault, f:
- self.failUnless(f.status == status)
+ if type(status) == types.ListType:
+ self.failUnless(f.status in status)
+ else:
+ self.failUnless(f.status == status)
def assert_not_raises_fault(self, status, callableObj, *args, **kwargs):
"""
self.assert_raises_fault(404, self.client.retrieve_object_metadata,
container, object)
+ def assert_versionlist_structure(self, versionlist):
+ self.assertTrue(type(versionlist) == types.ListType)
+ for elem in versionlist:
+ self.assertTrue(type(elem) == types.ListType)
+ self.assertEqual(len(elem), 2)
+
def upload_random_data(self, container, name, length=1024, type=None,
enc=None, **meta):
data = get_random_data(length)
obj['meta'] = args
path = '/%s/%s' % (container, name)
- self.client.create_object(container, name, StringIO(obj['data']),
- meta, **args)
+ self.client.create_object(container, name, f=StringIO(obj['data']),
+ meta=meta, **args)
return obj
except IOError:
class AccountHead(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
for item in self.containers:
self.client.create_container(item)
class AccountGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
#create some containers
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
for item in self.containers:
class AccountPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
for item in self.containers:
self.client.create_container(item)
groups = {'pithosdev':'verigak,gtsouk,chazapis'}
self.client.set_account_groups(**groups)
- self.assertEqual(groups, self.client.retrieve_account_groups())
+ self.assertEqual(set(groups['pithosdev']),
+ set(self.client.retrieve_account_groups()['pithosdev']))
more_groups = {'clientsdev':'pkanavos,mvasilak'}
self.client.set_account_groups(**more_groups)
groups.update(more_groups)
- self.assertEqual(set(groups['pithosdev']),
- set(self.client.retrieve_account_groups()['pithosdev']))
+ self.assertEqual(set(groups['clientsdev']),
+ set(self.client.retrieve_account_groups()['clientsdev']))
def test_reset_account_groups(self):
with AssertMappingInvariant(self.client.retrieve_account_metadata):
'clientsdev':'pkanavos,mvasilak'}
self.client.set_account_groups(**groups)
- self.assertEqual(groups, self.client.retrieve_account_groups())
+ self.assertEqual(set(groups['pithosdev'].split(',')),
+ set(self.client.retrieve_account_groups()['pithosdev'].split(',')))
+ self.assertEqual(set(groups['clientsdev'].split(',')),
+ set(self.client.retrieve_account_groups()['clientsdev'].split(',')))
groups = {'pithosdev':'verigak,gtsouk,chazapis,papagian'}
self.client.reset_account_groups(**groups)
- self.assertEqual(groups['pithosdev'].split(','),
- self.client.retrieve_account_groups()['pithosdev'].split(','))
+ self.assertEqual(set(groups['pithosdev'].split(',')),
+ set(self.client.retrieve_account_groups()['pithosdev'].split(',')))
def test_delete_account_groups(self):
with AssertMappingInvariant(self.client.retrieve_account_metadata):
class ContainerHead(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.container = 'apples'
self.client.create_container(self.container)
class ContainerGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.container = ['pears', 'apples']
for c in self.container:
self.client.create_container(c)
class ContainerPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
def test_create(self):
class ContainerPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.container = 'apples'
self.client.create_container(self.container)
class ContainerDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
class ObjectGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
#create some containers
for c in self.containers:
for n in names:
self.objects.append(self.upload_random_data(self.containers[1], n))
+ def test_versions(self):
+ c = self.containers[1]
+ o = self.objects[0]
+ b = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(b)
+
+ #update meta
+ meta = {'quality':'AAA', 'stock':True}
+ self.client.update_object_metadata(c, o['name'], **meta)
+
+ a = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(a)
+ self.assertEqual(len(b)+1, len(a))
+ self.assertEqual(b, a[:-1])
+
+ #get exact previous version metadata
+ v = a[-2][0]
+ v_meta = self.client.retrieve_object_metadata(c, o['name'],
+ restricted=True,
+ version=v)
+ for k in meta.keys():
+ self.assertTrue(k not in v_meta)
+
+ #update obejct
+ data = get_random_data()
+ self.client.update_object(c, o['name'], StringIO(data))
+
+ aa = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+ self.assert_versionlist_structure(aa)
+ self.assertEqual(len(a)+1, len(aa))
+ self.assertEqual(a, aa[:-1])
+
+ #get exact previous version
+ v = aa[-3][0]
+ v_data = self.client.retrieve_object_version(c, o['name'], version=v)
+ self.assertEqual(o['data'], v_data)
+ self.assertEqual(self.client.retrieve_object(c, o['name']),
+ '%s%s' %(v_data, data))
+
def test_get(self):
#perform get
o = self.client.retrieve_object(self.containers[1],
fname = 'largefile'
o = self.upload_random_data(self.containers[1], fname, l)
if o:
- data = self.client.retrieve_object(self.containers[1], fname,
+ body = self.client.retrieve_object(self.containers[1], fname,
format='json')
- body = json.loads(data)
hashes = body['hashes']
block_size = body['block_size']
block_hash = body['block_hash']
class ObjectPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.container = 'c1'
self.client.create_container(self.container)
self.assertEqual('', self.client.retrieve_object(self.container,
'large-object'))
+ def test_create_zero_length_object(self):
+ c = self.container
+ o = 'object'
+ zero = self.client.create_zero_length_object(c, o)
+ zero_meta = self.client.retrieve_object_metadata(c, o)
+ zero_hash = self.client.retrieve_object_hashmap(c, o)
+ zero_data = self.client.retrieve_object(c, o)
+
+ self.assertEqual(int(zero_meta['content-length']), 0)
+ self.assertEqual(zero_hash, [])
+ self.assertEqual(zero_data, '')
+
+ def test_create_object_by_hashmap(self):
+ c = self.container
+ o = 'object'
+ self.upload_random_data(c, o)
+ hashmap = self.client.retrieve_object(c, o, format='json')
+ o2 = 'object-copy'
+ self.client.create_object_by_hashmap(c, o2, hashmap)
+ self.assertEqual(self.client.retrieve_object(c, o),
+ self.client.retrieve_object(c, o))
+
class ObjectCopy(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
-
- def tearDown(self):
- pass
-
+
def test_copy(self):
with AssertMappingInvariant(self.client.retrieve_object_metadata,
self.containers[0], self.obj['name']):
class ObjectMove(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
class ObjectPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
- self.obj = self.upload_random_data(self.containers[0], o_names[0])
+ self.obj = []
+ for i in range(2):
+ self.obj.append(self.upload_random_data(self.containers[0], o_names[i]))
def test_update_meta(self):
#perform update metadata
more = {'foo':'foo', 'bar':'bar'}
status = self.client.update_object_metadata(self.containers[0],
- self.obj['name'],
+ self.obj[0]['name'],
**more)[0]
#assert request accepted
self.assertEqual(status, 202)
#assert old metadata are still there
headers = self.client.retrieve_object_metadata(self.containers[0],
- self.obj['name'],
+ self.obj[0]['name'],
restricted=True)
#assert new metadata have been updated
for k,v in more.items():
last_byte_pos=499,
instance_length = True,
content_length = 500):
- l = len(self.obj['data'])
- length = l if instance_length else '*'
+ l = len(self.obj[0]['data'])
range = 'bytes %d-%d/%s' %(first_byte_pos,
last_byte_pos,
- length)
+ l if instance_length else '*')
partial = last_byte_pos - first_byte_pos + 1
+ length = first_byte_pos + partial
data = get_random_data(partial)
args = {'content_type':'application/octet-stream',
'content_range':'%s' %range}
if content_length:
args['content_length'] = content_length
- status = self.client.update_object(self.containers[0], self.obj['name'],
+ status = self.client.update_object(self.containers[0], self.obj[0]['name'],
StringIO(data), **args)[0]
if partial < 0 or (instance_length and l <= last_byte_pos):
self.assertEqual(status, 204)
#check modified object
content = self.client.retrieve_object(self.containers[0],
- self.obj['name'])
- self.assertEqual(content[0:partial], data)
- self.assertEqual(content[partial:l], self.obj['data'][partial:l])
+ self.obj[0]['name'])
+ self.assertEqual(content[:first_byte_pos], self.obj[0]['data'][:first_byte_pos])
+ self.assertEqual(content[first_byte_pos:last_byte_pos+1], data)
+ self.assertEqual(content[last_byte_pos+1:], self.obj[0]['data'][last_byte_pos+1:])
+
+ def test_update_object_lt_blocksize(self):
+ self.test_update_object(10, 20, content_length=None)
+
+ def test_update_object_gt_blocksize(self):
+ o = self.upload_random_data(self.containers[0], o_names[1],
+ length=4*1024*1024+5)
+ c = self.containers[0]
+ o_name = o['name']
+ o_data = o['data']
+ first_byte_pos = 4*1024*1024+1
+ last_byte_pos = 4*1024*1024+4
+ l = last_byte_pos - first_byte_pos + 1
+ data = get_random_data(l)
+ range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos)
+ self.client.update_object(c, o_name, StringIO(data), content_range=range)
+ content = self.client.retrieve_object(c, o_name)
+ self.assertEqual(content[:first_byte_pos], o_data[:first_byte_pos])
+ self.assertEqual(content[first_byte_pos:last_byte_pos+1], data)
+ self.assertEqual(content[last_byte_pos+1:], o_data[last_byte_pos+1:])
+
+ def test_update_object_divided_by_blocksize(self):
+ o = self.upload_random_data(self.containers[0], o_names[1],
+ length=4*1024*1024+5)
+ c = self.containers[0]
+ o_name = o['name']
+ o_data = o['data']
+ first_byte_pos = 4*1024*1024
+ last_byte_pos = 5*1024*1024
+ l = last_byte_pos - first_byte_pos + 1
+ data = get_random_data(l)
+ range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos)
+ self.client.update_object(c, o_name, StringIO(data), content_range=range)
+ content = self.client.retrieve_object(c, o_name)
+ self.assertEqual(content[:first_byte_pos], o_data[:first_byte_pos])
+ self.assertEqual(content[first_byte_pos:last_byte_pos+1], data)
+ self.assertEqual(content[last_byte_pos+1:], o_data[last_byte_pos+1:])
def test_update_object_no_content_length(self):
self.test_update_object(content_length = None)
def test_update_object_invalid_content_length(self):
with AssertContentInvariant(self.client.retrieve_object,
- self.containers[0], self.obj['name']):
+ self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(400, self.test_update_object,
content_length = 1000)
def test_update_object_invalid_range(self):
with AssertContentInvariant(self.client.retrieve_object,
- self.containers[0], self.obj['name']):
+ self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
def test_update_object_invalid_range_and_length(self):
with AssertContentInvariant(self.client.retrieve_object,
- self.containers[0], self.obj['name']):
- self.assert_raises_fault(416, self.test_update_object, 499, 0, True,
+ self.containers[0], self.obj[0]['name']):
+ self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
-1)
def test_update_object_invalid_range_with_no_content_length(self):
with AssertContentInvariant(self.client.retrieve_object,
- self.containers[0], self.obj['name']):
+ self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(416, self.test_update_object, 499, 0, True,
content_length = None)
def test_update_object_out_of_limits(self):
with AssertContentInvariant(self.client.retrieve_object,
- self.containers[0], self.obj['name']):
- l = len(self.obj['data'])
+ self.containers[0], self.obj[0]['name']):
+ l = len(self.obj[0]['data'])
self.assert_raises_fault(416, self.test_update_object, 0, l+1, True)
def test_append(self):
data = get_random_data(500)
headers = {}
- self.client.update_object(self.containers[0], self.obj['name'],
+ self.client.update_object(self.containers[0], self.obj[0]['name'],
StringIO(data), content_length=500,
content_type='application/octet-stream')
content = self.client.retrieve_object(self.containers[0],
- self.obj['name'])
- self.assertEqual(len(content), len(self.obj['data']) + 500)
- self.assertEqual(content[:-500], self.obj['data'])
+ self.obj[0]['name'])
+ self.assertEqual(len(content), len(self.obj[0]['data']) + 500)
+ self.assertEqual(content[:-500], self.obj[0]['data'])
def test_update_with_chunked_transfer(self):
data = get_random_data(500)
dl = len(data)
- fl = len(self.obj['data'])
+ fl = len(self.obj[0]['data'])
self.client.update_object_using_chunks(self.containers[0],
- self.obj['name'], StringIO(data),
+ self.obj[0]['name'],
+ StringIO(data),
offset=0,
content_type='application/octet-stream')
#check modified object
content = self.client.retrieve_object(self.containers[0],
- self.obj['name'])
+ self.obj[0]['name'])
self.assertEqual(content[0:dl], data)
- self.assertEqual(content[dl:fl], self.obj['data'][dl:fl])
+ self.assertEqual(content[dl:fl], self.obj[0]['data'][dl:fl])
def test_update_from_other_object(self):
- data = self.upload_random_data(self.containers[0], o_names[1])['data']
- source_object = '/%s/%s' % (self.containers[0], o_names[0])
- self.client.update_from_other_source(self.containers[0], o_names[1],
- source_object)
+ c = self.containers[0]
+ src = o_names[0]
+ dest = 'object'
+
+ source_data = self.client.retrieve_object(c, src)
+ source_meta = self.client.retrieve_object_metadata(c, src)
+ source_hash = self.client.retrieve_object_hashmap(c, src)
+
+ #update zero length object
+ self.client.create_zero_length_object(c, dest)
+ source_object = '/%s/%s' % (c, src)
+ self.client.update_from_other_source(c, dest, source_object)
+ dest_data = self.client.retrieve_object(c, src)
+ dest_meta = self.client.retrieve_object_metadata(c, dest)
+ dest_hash = self.client.retrieve_object_hashmap(c, src)
+ self.assertEqual(source_data, dest_data)
+ self.assertEqual(source_hash, dest_hash)
+
+ #test append
+ self.client.update_from_other_source(c, dest, source_object)
+ content = self.client.retrieve_object(c, dest)
+ self.assertEqual(source_data * 2, content)
+
+ def test_update_range_from_other_object(self):
+ c = self.containers[0]
+ dest = 'object'
+
+ #test update range
+ src = self.obj[1]['name']
+ src_data = self.client.retrieve_object(c, src)
+
+ #update zero length object
+ prev_data = self.upload_random_data(c, dest, length=4*1024*1024+10)['data']
+ source_object = '/%s/%s' % (c, src)
+ first_byte_pos = 4*1024*1024+1
+ last_byte_pos = 4*1024*1024+4
+ range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos)
+ self.client.update_from_other_source(c, dest, source_object,
+ content_range=range)
+ content = self.client.retrieve_object(c, dest)
+ self.assertEqual(content[:first_byte_pos], prev_data[:first_byte_pos])
+ self.assertEqual(content[first_byte_pos:last_byte_pos+1], src_data[:last_byte_pos - first_byte_pos + 1])
+ self.assertEqual(content[last_byte_pos+1:], prev_data[last_byte_pos+1:])
+
+ def test_update_hashes_from_other_object(self):
+ c = self.containers[0]
+ dest = 'object'
+
+ #test update range
+ src_data = self.upload_random_data(c, o_names[0], length=1024*1024+10)['data']
+
+ #update zero length object
+ prev_data = self.upload_random_data(c, dest, length=5*1024*1024+10)['data']
+ source_object = '/%s/%s' % (c, o_names[0])
+ first_byte_pos = 4*1024*1024
+ last_byte_pos = 5*1024*1024
+ range = 'bytes %d-%d/*' %(first_byte_pos, last_byte_pos)
+ self.client.update_from_other_source(c, dest, source_object,
+ content_range=range)
+ content = self.client.retrieve_object(c, dest)
+ self.assertEqual(content[:first_byte_pos], prev_data[:first_byte_pos])
+ self.assertEqual(content[first_byte_pos:last_byte_pos+1], src_data[:last_byte_pos - first_byte_pos + 1])
+ self.assertEqual(content[last_byte_pos+1:], prev_data[last_byte_pos+1:])
+
+
+ def test_update_zero_length_object(self):
+ c = self.containers[0]
+ o = 'object'
+ other = 'other'
+ zero = self.client.create_zero_length_object(c, o)
- self.assertEqual(
- self.client.retrieve_object(self.containers[0], o_names[1]),
- '%s%s' % (data,
- self.client.retrieve_object(self.containers[0], o_names[0])))
-
+ data = get_random_data()
+ self.client.update_object(c, o, StringIO(data))
+ self.client.create_object(c, other, StringIO(data))
+
+ self.assertEqual(self.client.retrieve_object(c, o),
+ self.client.retrieve_object(c, other))
+
+ self.assertEqual(self.client.retrieve_object_hashmap(c, o),
+ self.client.retrieve_object_hashmap(c, other))
-
class ObjectDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.account = 'test'
self.containers = ['c1', 'c2']
for c in self.containers:
self.client.create_container(c)
class ListSharing(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
+ for i in range(2):
+ self.client.create_container('c%s' %i)
self.client.create_container('c')
for i in range(2):
- self.upload_random_data('c', 'o%s' %i)
+ self.upload_random_data('c1', 'o%s' %i)
accounts = OTHER_ACCOUNTS.copy()
self.o1_sharing_with = accounts.popitem()
self.o1_sharing = [self.o1_sharing_with[1]]
- self.client.share_object('c', 'o1', self.o1_sharing, read=True)
+ self.client.share_object('c1', 'o1', self.o1_sharing, read=True)
l = []
for i in range(2):
l.append(accounts.popitem())
- #self.client.set_account_groups({'pithos-dev':'chazapis,verigak,papagian'})
- #self.o2_sharing = 'write=%s' %
- #self.client.share_object('c', 'o2', self.o2_sharing)
- def test_listing(self):
+ def test_list_other_shared(self):
self.other = Pithos_Client(get_server(),
self.o1_sharing_with[0],
self.o1_sharing_with[1],
get_api())
- self.assertTrue('test' in self.other.list_shared_by_others())
-
+ self.assertTrue(get_user() in self.other.list_shared_by_others())
+
+ def test_list_my_shared(self):
+ my_shared_containers = self.client.list_containers(shared=True)
+ self.assertTrue('c1' in my_shared_containers)
+ self.assertTrue('c2' not in my_shared_containers)
+
+ my_shared_objects = self.client.list_objects('c1', shared=True)
+ self.assertTrue('o1' in my_shared_objects)
+ self.assertTrue('o2' not in my_shared_objects)
+
class TestGreek(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
#check read access
self.client.create_container('φάκελος')
o = self.upload_random_data('φάκελος', 'ο1')
- self.client.share_object('φάκελος', 'ο1', ['test:σεφς'])
+ self.client.share_object('φάκελος', 'ο1', ['%s:σεφς' % get_user()])
chef = Pithos_Client(get_server(),
'0009',
'διογένης',
self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
#create a group
- self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+ self.authorized = ['chazapis', 'verigak', 'gtsouk']
groups = {'pithosdev':','.join(self.authorized)}
self.client.set_account_groups(**groups)
BaseTestCase.tearDown(self)
- def test_read_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['test:pithosdev'])
+ def assert_read(self, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
- if account in self.authorized:
+ if account in authorized or any:
self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
else:
self.assert_raises_fault(401, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
+ #check inheritance
+ o = self.upload_random_data('c', 'o/also-shared')
+ for token, account in OTHER_ACCOUNTS.items():
+ cl = Pithos_Client(get_server(), token, account, get_api())
+ if account in authorized or any:
+ self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+ 'c', 'o/also-shared', account=get_user())
+ else:
+ self.assert_raises_fault(401, cl.retrieve_object_metadata,
+ 'c', 'o/also-shared', account=get_user())
- def test_write_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['chazapis'], read=False)
+ def assert_write(self, o_data, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
new_data = get_random_data()
- if account == 'chazapis':
+ if account in authorized or any:
+ # test write access
self.assert_not_raises_fault(401, cl.update_object,
'c', 'o', StringIO(new_data),
account=get_user())
- server_data = self.client.retrieve_object('c', 'o')
- self.assertEqual(o['data'], server_data[:len(o['data'])])
- self.assertEqual(new_data, server_data[len(o['data']):])
+ try:
+ # test read access
+ server_data = cl.retrieve_object('c', 'o', account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 401)
else:
self.assert_raises_fault(401, cl.update_object,
'c', 'o', StringIO(new_data),
account=get_user())
+
+ #check inheritance
+ o = self.upload_random_data('c', 'o/also-shared')
+ o_data = o['data']
+ for token, account in OTHER_ACCOUNTS.items():
+ cl = Pithos_Client(get_server(), token, account, get_api())
+ new_data = get_random_data()
+ if account in authorized or any:
+ # test write access
+ self.assert_not_raises_fault(401, cl.update_object,
+ 'c', o['name'],
+ StringIO(new_data),
+ account=get_user())
+ try:
+ server_data = cl.retrieve_object('c', o['name'], account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 401)
+ else:
+ self.assert_raises_fault(401, cl.update_object,
+ 'c', o['name'],
+ StringIO(new_data),
+ account=get_user())
+
+ def test_group_read(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_many(self):
+ #test read access
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized)
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'])
+ self.assert_read(any=True)
+
+ def test_group_write(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_many(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized, read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'], read=False)
+ o_data = o['data']
+ self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+ def test_publish(self):
+ self.client.create_container('c')
+ o_data = self.upload_random_data('c', 'o')['data']
+ self.client.publish_object('c', 'o')
+ meta = self.client.retrieve_object_metadata('c', 'o')
+ self.assertTrue('x-object-public' in meta)
+ url = '/public/%s/c/o' % get_user()
+ self.assertEqual(meta['x-object-public'], url)
+ public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+ data = public_client.get(url)[2]
+ self.assertEqual(o_data, data)
class AssertMappingInvariant(object):
def __init__(self, callable, *args, **kwargs):
'photos/me.jpg']
if __name__ == "__main__":
- unittest.main()
+ if get_user() == 'test':
+ unittest.main()
+ else:
+ print 'Will not run tests as any other user except \'test\' (current user: %s).' % get_user()