def object_copy(self, object, destination, format='json', ignore_content_type=False,
if_etag_match=None, if_etag_not_match=None, destination_account=None,
content_type=None, content_encoding=None, content_disposition=None, source_version=None,
- manifest=None, permitions={}, public=False, metadata={}, *args, **kwargs):
+ permitions={}, public=False, metadata={}, *args, **kwargs):
""" Full Pithos+ COPY at object level
--- request parameters ---
@param format (string): json (default) or xml
@param content_encoding (string): The encoding of the object
@param content_disposition (string): The presentation style of the object
@param source_version (string): The source version to copy from
- @param manifest (string): Object parts prefix in /<container>/<object> form
@param permitions (dict): Object permissions in the form (all fields are optional)
{'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
permitions override source permitions, removing any old permitions
self.set_header('Content-Encoding', content_encoding)
self.set_header('Content-Disposition', content_disposition)
self.set_header('X-Source-Version', source_version)
- self.set_header('X-Object-Manifest', manifest)
perms = None
for permition_type, permition_list in permitions.items():
if perms is None:
def object_move(self, object, format='json', ignore_content_type=False,
if_etag_match=None, if_etag_not_match=None, destination=None, destination_account=None,
- content_type=None, content_encoding=None, content_disposition=None, manifest=None,
- permitions={}, public=False, metadata={}, *args, **kwargs):
+ content_type=None, content_encoding=None, content_disposition=None, permitions={},
+ public=False, metadata={}, *args, **kwargs):
""" Full Pithos+ COPY at object level
--- request parameters ---
@param format (string): json (default) or xml
@param content_encoding (string): The encoding of the object
@param content_disposition (string): The presentation style of the object
@param source_version (string): The source version to copy from
- @param manifest (string): Object parts prefix in /<container>/<object> form
@param permitions (dict): Object permissions in the form (all fields are optional)
{'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
@param public (bool): If true, Object is publicly accessible, if false, not
self.set_header('Content-Type', content_type)
self.set_header('Content-Encoding', content_encoding)
self.set_header('Content-Disposition', content_disposition)
- self.set_header('X-Object-Manifest', manifest)
perms = None
for permition_type, permition_list in permitions.items():
if perms is None:
name = obj['name']
self.client.reset_headers()
self.client.object_delete(name)
- #print('Just deleted '+name+' in '+container)
self.client.reset_headers()
self.client.container_delete()
self.client.reset_headers()
def tearDown(self):
"""Destroy test cases"""
- print('destroy test ...')
if self.fname is not None:
try:
os.remove(self.fname)
pass
self.client.container=''
- def atest_account_head(self):
+ def test_account_head(self):
"""Test account_HEAD"""
r = self.client.account_head()
self.assertEqual(r.status_code, 204)
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
- def atest_account_get(self):
+ def test_account_get(self):
"""Test account_GET"""
r = self.client.account_get()
self.assertEqual(r.status_code, 200)
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
- def atest_account_post(self):
+ def test_account_post(self):
"""Test account_POST"""
r = self.client.account_post()
self.assertEqual(r.status_code, 202)
you don't have permitions to modify those at account level
"""
- def atest_container_head(self):
+ def test_container_head(self):
"""Test container_HEAD"""
self.client.container = self.c1
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
- def atest_container_get(self):
+ def test_container_get(self):
"""Test container_GET"""
self.client.container = self.c1
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
- def atest_container_put(self):
+ def test_container_put(self):
"""Test container_PUT"""
self.client.container = self.c2
self.client.del_container_meta(self.client.container)
- def atest_container_post(self):
+ def test_container_post(self):
"""Test container_POST"""
self.client.container = self.c2
r = self.client.del_container_meta('m2')
self.client.reset_headers()
- def atest_container_delete(self):
+ def test_container_delete(self):
"""Test container_DELETE"""
"""Fail to delete a non-empty container"""
self.assertEqual(r.status_code, 204)
self.client.reset_headers()
- def atest_object_head(self):
+ def test_object_head(self):
+ """Test object_HEAD"""
self.client.container = self.c2
obj = 'test'
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
- def atest_object_get(self):
+ def test_object_get(self):
+ """Test object_GET"""
self.client.container = self.c1
obj = 'test'
self.assertNotEqual(r1.status_code, r2.status_code)
self.client.reset_headers()
- def atest_object_put(self):
+ def test_object_put(self):
"""test object_PUT"""
self.client.container = self.c2
self.assertTrue(r.has_key('x-object-public'))
self.client.reset_headers()
- """Still untested: manifest"""
-
- def atest_object_move(self):
- self.client.container='testCo0'
- obj = 'obj'+unicode(self.now)
+ def test_object_move(self):
+ """Test object_MOVE"""
+ self.client.container= self.c2
+ obj = 'test2'
data= '{"key1":"val1", "key2":"val2"}'
r = self.client.object_put(obj+'orig', content_type='application/octet-stream',
data= data, metadata={'mkey1':'mval1', 'mkey2':'mval2'},
permitions={'read':['accX:groupA', 'u1', 'u2'], 'write':['u2', 'u3']})
self.client.reset_headers()
- r = self.client.object_move(obj+'orig', destination = '/'+self.client.container+'/'+obj, ignore_content_type=False, content_type='application/json',
+ r = self.client.object_move(obj+'orig', destination = '/'+self.client.container+'/'+obj,
+ ignore_content_type=False, content_type='application/json',
metadata={'mkey2':'mval2a', 'mkey3':'mval3'},
permitions={'write':['u5', 'accX:groupB']})
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
"""Check Metadata"""
- r = self.client.object_get(obj)
- self.assertTrue(r.headers.has_key('x-object-meta-mkey1'))
- self.assertEqual(r.headers['x-object-meta-mkey1'], 'mval1')
- self.assertTrue(r.headers.has_key('x-object-meta-mkey2'))
- self.assertEqual(r.headers['x-object-meta-mkey2'], 'mval2a')
- self.assertTrue(r.headers.has_key('x-object-meta-mkey3'))
- self.assertEqual(r.headers['x-object-meta-mkey3'], 'mval3')
+ r = self.client.get_object_meta(obj)
+ self.assertEqual(r['mkey1'], 'mval1')
+ self.assertEqual(r['mkey2'], 'mval2a')
+ self.assertEqual(r['mkey3'], 'mval3')
+ self.client.reset_headers()
+
"""Check permitions"""
- self.assertFalse('read' in r.headers['x-object-sharing'])
- self.assertFalse('u2' in r.headers['x-object-sharing'])
- self.assertTrue('write' in r.headers['x-object-sharing'])
- self.assertTrue('accx:groupb' in r.headers['x-object-sharing'])
+ r = self.client.get_object_sharing(obj)
+ self.assertFalse(r.has_key('read'))
+ self.assertTrue('u5' in r['write'])
+ self.assertTrue('accx:groupb' in r['write'])
self.client.reset_headers()
"""Check destination account"""
- r = self.client.object_move(obj, destination='/testCo/'+obj, content_encoding='utf8',
+ r = self.client.object_move(obj, destination='/%s/%s'%(self.c1,obj), content_encoding='utf8',
content_type='application/json', destination_account='nonExistendAddress@NeverLand.com',
success=(201, 403))
self.assertEqual(r.status_code, 403)
self.client.reset_headers()
- """Check destination being another container and also content_type and content encoding"""
- r = self.client.object_move(obj, destination='/testCo/'+obj,
- content_encoding='utf8', content_type='application/json')
+ """Check destination being another container and also
+ content_type, content_disposition and content encoding"""
+ r = self.client.object_move(obj, destination='/%s/%s'%(self.c1,obj),
+ content_encoding='utf8', content_type='application/json',
+ content_disposition='attachment; filename="fname.ext"')
self.assertEqual(r.status_code, 201)
self.assertEqual(r.headers['content-type'], 'application/json; charset=UTF-8')
self.client.reset_headers()
+ r = self.client.container=self.c1
+ r = self.client.get_object_info(obj)
+ self.assertTrue(r.has_key('content-disposition') and 'fname.ext' in r['content-disposition'])
+ etag = r['etag']
+ ctype = r['content-type']
+ self.assertEqual(ctype, 'application/json')
self.client.reset_headers()
"""Check ignore_content_type and content_type"""
- r = self.client.container='testCo'
- r = self.client.object_get(obj)
- etag = r.headers['etag']
- ctype = r.headers['content-type']
- self.assertEqual(ctype, 'application/json')
- self.client.reset_headers()
- r = self.client.object_move(obj, destination = '/testCo0/'+obj,
+ r = self.client.object_move(obj, destination = '/%s/%s'%(self.c2,obj),
ignore_content_type=True, content_type='application/json')
self.assertEqual(r.status_code, 201)
self.assertNotEqual(r.headers['content-type'], 'application/json')
- r = self.client.container='testCo0'
self.client.reset_headers()
"""Check if_etag_(not_)match"""
- r = self.client.object_move(obj, destination='/'+self.client.container+'/'+obj+'0', if_etag_match=etag)
+ r = self.client.container=self.c2
+ r = self.client.object_move(obj, destination='/'+self.client.container+'/'+obj+'0',
+ if_etag_match=etag)
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
- r = self.client.object_move(obj+'0', destination='/'+self.client.container+'/'+obj+'1', if_etag_not_match='lalala')
+ r = self.client.object_move(obj+'0', destination='/'+self.client.container+'/'+obj+'1',
+ if_etag_not_match='lalala')
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
"""Check public and format """
- r = self.client.object_move(obj+'1', destination='/'+self.client.container+'/'+obj+'2', format='xml', public=True)
+ r = self.client.object_move(obj+'1', destination='/'+self.client.container+'/'+obj+'2',
+ format='xml', public=True)
self.assertEqual(r.status_code, 201)
self.assertTrue(r.headers['content-type'].index('xml') > 0)
self.client.reset_headers()
- r = self.client.object_get(obj+'2')
- self.assertTrue(r.headers.has_key('x-object-public'))
+ r = self.client.get_object_info(obj+'2')
+ self.assertTrue(r.has_key('x-object-public'))
self.client.reset_headers()
- """Still untested: content_disposition, manifest"""
- self.client.delete_object(obj+'2')
- self.client.container=''
-
- def atest_object_post(self):
- self.client.container='testCo0'
- obj = 'obj'+unicode(self.now)
+ def test_object_post(self):
+ """Test object_POST"""
+ self.client.container=self.c2
+ obj = 'test2'
"""create a filesystem file"""
- newf = open(obj, 'w')
+ self.fname = obj
+ newf = open(self.fname, 'w')
newf.writelines(['ello!\n','This is a test line\n','inside a test file\n'])
+ newf.close()
"""create a file on container"""
r = self.client.object_put(obj, content_type='application/octet-stream',
data= 'H', metadata={'mkey1':'mval1', 'mkey2':'mval2'},
self.client.set_object_meta(obj, {'mkey2':'mval2a', 'mkey3':'mval3'})
self.client.reset_headers()
r = self.client.get_object_meta(obj)
- self.assertTrue(r.has_key('x-object-meta-mkey1'))
- self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
- self.assertTrue(r.has_key('x-object-meta-mkey2'))
- self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
- self.assertTrue(r.has_key('x-object-meta-mkey3'))
- self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
+ self.assertEqual(r['mkey1'], 'mval1')
+ self.assertEqual(r['mkey2'], 'mval2a')
+ self.assertEqual(r['mkey3'], 'mval3')
self.client.reset_headers()
self.client.del_object_meta('mkey1', obj)
self.client.reset_headers()
r = self.client.get_object_meta(obj)
- self.assertFalse(r.has_key('x-object-meta-mkey1'))
+ self.assertFalse(r.has_key('mkey1'))
self.client.reset_headers()
"""Check permitions"""
- self.client.set_object_sharing(obj, read_permition=['u4', 'u5'], write_permition=['u4'])
+ self.client.set_object_sharing(obj,
+ read_permition=['u4', 'u5'], write_permition=['u4'])
self.client.reset_headers()
r = self.client.get_object_sharing(obj)
self.assertTrue(r.has_key('read'))
- self.assertTrue(r['read'].index('u5') >= 0)
+ self.assertTrue('u5' in r['read'])
self.assertTrue(r.has_key('write'))
- self.assertTrue(r['write'].index('u4') >= 0)
+ self.assertTrue('u4' in r['write'])
self.client.reset_headers()
self.client.del_object_sharing(obj)
r = self.client.get_object_sharing(obj)
self.assertEqual(r['content-encoding'], 'application/json')
self.client.reset_headers()
- """Check source_version and source_account"""
+ """Check source_version and source_account and content_disposition"""
r = self.client.object_post(obj, update=True, content_type='application/octet-srteam',
- content_length=5, content_range='bytes 1-5/*', source_object='/testCo0/'+obj,
- source_account='thisAccountWillNeverExist@adminland.com', source_version=helloVersion, data='12345',
- success=(403, 202, 204))
+ content_length=5, content_range='bytes 1-5/*', source_object='/%s/%s'%(self.c2,obj),
+ source_account='thisAccountWillNeverExist@adminland.com',
+ source_version=helloVersion, data='12345', success=(403, 202, 204))
self.assertEqual(r.status_code, 403)
self.client.reset_headers()
r = self.client.object_post(obj, update=True, content_type='application/octet-srteam',
- content_length=5, content_range='bytes 1-5/*', source_object='/testCo0/'+obj,
- source_account='thisAccountWillNeverExist@adminland.com', source_version=helloVersion, data='12345')
+ content_length=5, content_range='bytes 1-5/*', source_object='/%s/%s'%(self.c2,obj),
+ source_account=self.client.account, source_version=helloVersion, data='12345',
+ content_disposition='attachment; filename="fname.ext"')
self.client.reset_headers()
r = self.client.object_get(obj)
self.assertEqual(r.text, 'eello!')
+ self.assertTrue(r.headers.has_key('content-disposition')
+ and 'fname.ext' in r.headers['content-disposition'])
self.client.reset_headers()
- """We need to check transfer_encoding, content_disposition, manifest """
+ """Check manifest"""
+ mobj = 'manifest.test'
+ txt = ''
+ for i in range(10):
+ txt += '%s'%i
+ r = self.client.object_put('%s/%s'%(mobj, i), data='%s'%i,
+ content_encoding='application/octet-stream',
+ content_length=1, success=201)
+ self.client.reset_headers()
+ self.client.object_put(mobj, content_length=0)
+ self.client.reset_headers()
+ r = self.client.object_post(mobj, manifest='%s/%s'%(self.client.container, mobj))
+ self.client.reset_headers()
+ r = self.client.object_get(mobj)
+ self.assertEqual(r.text, txt)
+ self.client.reset_headers()
- self.client.object_delete(obj)
- os.remove(obj)
- self.client.container=''
+ """We need to check transfer_encoding """
- def atest_object_delete(self):
- self.client.container='testCo0'
- obj = 'obj'+unicode(self.now)
+ def test_object_delete(self):
+ """Test object_DELETE"""
+ self.client.container=self.c2
+ obj = 'test2'
"""create a file on container"""
r = self.client.object_put(obj, content_type='application/octet-stream',
data= 'H', metadata={'mkey1':'mval1', 'mkey2':'mval2'},
r = self.client.object_get(obj, success=(200, 404))
self.assertEqual(r.status_code, 404)
- self.client.container = ''
-
class testCyclades(unittest.TestCase):
def setUp(self):
pass
suiteFew = unittest.TestSuite()
#kamaki/pithos.py
- """
suiteFew.addTest(testPithos('test_account_head'))
suiteFew.addTest(testPithos('test_account_get'))
suiteFew.addTest(testPithos('test_account_post'))
suiteFew.addTest(testPithos('test_object_head'))
suiteFew.addTest(testPithos('test_object_get'))
suiteFew.addTest(testPithos('test_object_put'))
- """
suiteFew.addTest(testPithos('test_object_copy'))
- """
suiteFew.addTest(testPithos('test_object_move'))
suiteFew.addTest(testPithos('test_object_post'))
suiteFew.addTest(testPithos('test_object_delete'))
- """
#kamaki/cyclades.py
#suiteFew.addTest(testCyclades('test_list_servers'))