obj['meta'] = args
path = '/%s/%s' % (container, name)
- self.client.create_object(container, name, StringIO(obj['data']),
- meta, **args)
+ self.client.create_object(container, name, f=StringIO(obj['data']),
+ meta=meta, **args)
return obj
except IOError:
self.assertEqual(int(zero_meta['content-length']), 0)
self.assertEqual(zero_hash, [])
self.assertEqual(zero_data, '')
+
+ def test_create_object_by_hashmap(self):
+ c = self.container
+ o = 'object'
+ self.upload_random_data(c, o)
+ hashmap = self.client.retrieve_object(c, o, format='json')
+ o2 = 'object-copy'
+ self.client.create_object_by_hashmap(c, o2, hashmap)
+ self.assertEqual(self.client.retrieve_object(c, o),
+ self.client.retrieve_object(c, o))
class ObjectCopy(BaseTestCase):
def setUp(self):
self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
#create a group
- self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+ self.authorized = ['chazapis', 'verigak', 'gtsouk']
groups = {'pithosdev':','.join(self.authorized)}
self.client.set_account_groups(**groups)
BaseTestCase.tearDown(self)
- def test_read_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+ def assert_read(self, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
- if account in self.authorized:
+ if account in authorized or any:
self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
'c', 'o', account=get_user())
else:
o = self.upload_random_data('c', 'o/also-shared')
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
- if account in self.authorized:
+ if account in authorized or any:
self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
'c', 'o/also-shared', account=get_user())
else:
self.assert_raises_fault(401, cl.retrieve_object_metadata,
'c', 'o/also-shared', account=get_user())
- def test_write_access(self):
- self.client.create_container('c')
- o = self.upload_random_data('c', 'o')
- self.client.share_object('c', 'o', ['chazapis'], read=False)
- o_data = o['data']
+ def assert_write(self, o_data, authorized=[], any=False):
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
new_data = get_random_data()
- if account in [get_user(), 'chazapis']:
+ if account in authorized or any:
+ # test write access
self.assert_not_raises_fault(401, cl.update_object,
'c', 'o', StringIO(new_data),
account=get_user())
- server_data = self.client.retrieve_object('c', 'o')
- self.assertEqual(o_data, server_data[:len(o_data)])
- self.assertEqual(new_data, server_data[len(o_data):])
- o_data = server_data
+ try:
+ # test read access
+ server_data = cl.retrieve_object('c', 'o', account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 401)
else:
self.assert_raises_fault(401, cl.update_object,
'c', 'o', StringIO(new_data),
for token, account in OTHER_ACCOUNTS.items():
cl = Pithos_Client(get_server(), token, account, get_api())
new_data = get_random_data()
- if account in [get_user(), 'chazapis']:
+ if account in authorized or any:
+ # test write access
self.assert_not_raises_fault(401, cl.update_object,
'c', o['name'],
StringIO(new_data),
account=get_user())
- server_data = self.client.retrieve_object('c', o['name'])
- self.assertEqual(o_data, server_data[:len(o_data)])
- self.assertEqual(new_data, server_data[len(o_data):])
- o_data = server_data
+ try:
+ server_data = cl.retrieve_object('c', o['name'], account=get_user())
+ self.assertEqual(o_data, server_data[:len(o_data)])
+ self.assertEqual(new_data, server_data[len(o_data):])
+ o_data = server_data
+ except Fault, f:
+ self.failIf(f.status == 401)
else:
self.assert_raises_fault(401, cl.update_object,
'c', o['name'],
StringIO(new_data),
account=get_user())
+
+ def test_group_read(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_many(self):
+ #test read access
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized)
+ self.assert_read(authorized=self.authorized)
+
+ def test_read_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'])
+ self.assert_read(any=True)
+
+ def test_group_write(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_many(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', self.authorized, read=False)
+ self.assert_write(o['data'], authorized=self.authorized)
+
+ def test_write_by_everyone(self):
+ self.client.create_container('c')
+ o = self.upload_random_data('c', 'o')
+ self.client.share_object('c', 'o', ['*'], read=False)
+ o_data = o['data']
+ self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+ def test_publish(self):
+ self.client.create_container('c')
+ o_data = self.upload_random_data('c', 'o')['data']
+ self.client.publish_object('c', 'o')
+ meta = self.client.retrieve_object_metadata('c', 'o')
+ self.assertTrue('x-object-public' in meta)
+ url = '/public/%s/c/o' % get_user()
+ self.assertEqual(meta['x-object-public'], url)
+ public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+ data = public_client.get(url)[2]
+ self.assertEqual(o_data, data)
class AssertMappingInvariant(object):
def __init__(self, callable, *args, **kwargs):