from kamaki.clients import pithos, cyclades, ClientError
class testPithos(unittest.TestCase):
+ """Set up a Pithos+ thorough test"""
def setUp(self):
url = 'http://127.0.0.1:8000/v1'
token = 'C/yBXmz3XjTFBnujc2biAg=='
def makeNewObject(self, container, obj):
self.client.container = container
self.client.object_put(obj, content_type='application/octet-stream',
- data= obj+' '+container, metadata={'incontainer':container})
+ data= 'file '+obj+' that lives in '+container,
+ metadata={'incontainer':container})
self.client.reset_headers()
def forceDeleteContainer(self, container):
self.container = ''
def tearDown(self):
+ """Destroy test cases"""
+ print('destroy test ...')
if self.fname is not None:
try:
os.remove(self.fname)
self.client.container=''
def atest_account_head(self):
+ """Test account_HEAD"""
r = self.client.account_head()
self.assertEqual(r.status_code, 204)
r = self.client.account_head(until='1000000000')
"""Check if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.account_head(if_modified_since=now_formated, success=(204, 412))
+ r1 = self.client.account_head(if_modified_since=now_formated, success=(204, 304, 412))
self.client.reset_headers()
- r2 = self.client.account_head(if_unmodified_since=now_formated, success=(204, 412))
+ r2 = self.client.account_head(if_unmodified_since=now_formated, success=(204, 304, 412))
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_account_get(self):
+ """Test account_GET"""
r = self.client.account_get()
self.assertEqual(r.status_code, 200)
fullLen = len(r.json)
"""Check if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.account_get(if_modified_since=now_formated, success=(200, 412))
+ r1 = self.client.account_get(if_modified_since=now_formated, success=(200, 304, 412))
self.client.reset_headers()
- r2 = self.client.account_get(if_unmodified_since=now_formated, success=(200, 412))
+ r2 = self.client.account_get(if_unmodified_since=now_formated, success=(200, 304, 412))
self.client.reset_headers()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_account_post(self):
+ """Test account_POST"""
r = self.client.account_post()
self.assertEqual(r.status_code, 202)
grpName = 'grp'+unicode(self.now)
"""
def atest_container_head(self):
+ """Test container_HEAD"""
self.client.container = self.c1
r = self.client.container_head()
self.assertNotEqual(r1.status_code, r2.status_code)
def atest_container_get(self):
+ """Test container_GET"""
self.client.container = self.c1
r = self.client.container_get()
self.client.reset_headers()
def atest_container_put(self):
+ """Test container_PUT"""
self.client.container = self.c2
r = self.client.container_put()
self.client.del_container_meta(self.client.container)
def atest_container_post(self):
+ """Test container_POST"""
self.client.container = self.c2
r = self.client.container_post()
self.assertEqual('none', nvers)
self.client.reset_headers()
- """Haven't figured out how to test put_block, which
- uses content_type and content_length to post blocks
- of data to container. But how do you check that
- the blocks are there?"""
+ """put_block uses content_type and content_length to
+ post blocks of data 2 container. All that in upload_object"""
"""Change a file at fs"""
self.fname = 'f'+unicode(self.now)
copyfile('pirifi.237M', self.fname)
self.client.create_directory('dir')
self.client.reset_headers()
newf = open(self.fname, 'r')
- self.client.create_object('/dir/sample.file', newf)
+ self.client.upload_object('/dir/sample.file', newf)
self.client.reset_headers()
newf.close()
"""Check if file has been uploaded"""
self.assertTrue(int(r['content-length']) > 248209936)
"""WTF is tranfer_encoding? What should I check about th** s**t? """
- r = self.client.container_post(update=True, transfer_encoding='xlm')
- self.client.reset_headers()
+ #TODO
"""Check update=False"""
r = self.client.object_post('test', update=False, metadata={'newmeta':'newval'})
r = self.client.del_container_meta('m2')
self.client.reset_headers()
- def test_container_delete(self):
- """Test container_delete"""
+ def atest_container_delete(self):
+ """Test container_DELETE"""
"""Fail to delete a non-empty container"""
self.client.container = self.c2
self.client.reset_headers()
def atest_object_head(self):
- self.client.container = 'testCo0'
- obj = 'lolens'
+ self.client.container = self.c2
+ obj = 'test'
r = self.client.object_head(obj)
self.assertEqual(r.status_code, 200)
etag = r.headers['etag']
r = self.client.object_head(obj, version=40)
- self.assertEqual(r.status_code, 200)
+ self.assertEqual(r.headers['x-object-version'], '40')
self.client.reset_headers()
r = self.client.object_head(obj, if_etag_match=etag)
self.assertEqual(r.status_code, 200)
- """I believe if_etag_not_match does not work..."""
self.client.reset_headers()
+ r = self.client.object_head(obj, if_etag_not_match=etag, success=(200, 412, 304))
+ self.assertNotEqual(r.status_code, 200)
r = self.client.object_head(obj, version=40, if_etag_match=etag, success=412)
self.assertEqual(r.status_code, 412)
self.client.reset_headers()
- """I believe if_un/modified_since does not work..."""
- r=self.client.object_head(obj, if_modified_since=self.now)
- self.assertEqual(r.status_code, 200)
- self.client.reset_headers()
-
- r=self.client.object_head(obj, if_unmodified_since=self.now)
- self.assertEqual(r.status_code, 200)
-
- self.client.container = ''
- self.client.reset_headers()
+ """Check and if(un)modified_since"""
+ for format in self.client.DATE_FORMATS:
+ now_formated = self.now_unformated.strftime(format)
+ self.client.reset_headers()
+ r1 = self.client.object_head(obj, if_modified_since=now_formated, success=(200, 304, 412))
+ self.client.reset_headers()
+ r2 = self.client.object_head(obj, if_unmodified_since=now_formated, success=(200, 304, 412))
+ self.client.reset_headers()
+ self.assertNotEqual(r1.status_code, r2.status_code)
+ self.client.reset_headers()
def atest_object_get(self):
- self.client.container = 'testCo'
+ self.client.container = self.c1
+ obj = 'test'
- r = self.client.object_get('lolens')
+ r = self.client.object_get(obj)
self.assertEqual(r.status_code, 200)
+
osize = int(r.headers['content-length'])
etag = r.headers['etag']
- r = self.client.object_get('lolens', hashmap=True)
+ r = self.client.object_get(obj, hashmap=True)
self.assertTrue(r.json.has_key('hashes') \
and r.json.has_key('block_hash') \
and r.json.has_key('block_size') \
and r.json.has_key('bytes'))
- r = self.client.object_get('lolens', format='xml', hashmap=True)
+ r = self.client.object_get(obj, format='xml', hashmap=True)
self.assertEqual(len(r.text.split('hash>')), 3)
self.client.reset_headers()
rangestr = 'bytes=%s-%s'%(osize/3, osize/2)
- r = self.client.object_get('lolens', data_range=rangestr, success=(200, 206))
+ r = self.client.object_get(obj, data_range=rangestr, success=(200, 206))
partsize = int(r.headers['content-length'])
self.assertTrue(0 < partsize and partsize <= 1+osize/3)
self.client.reset_headers()
rangestr = 'bytes=%s-%s'%(osize/3, osize/2)
- r = self.client.object_get('lolens', data_range=rangestr, if_range=True, success=(200, 206))
+ r = self.client.object_get(obj, data_range=rangestr, if_range=True, success=(200, 206))
partsize = int(r.headers['content-length'])
self.assertTrue(0 < partsize and partsize <= 1+osize/3)
self.client.reset_headers()
- r = self.client.object_get('lolens', if_etag_match=etag)
+ r = self.client.object_get(obj, if_etag_match=etag)
self.assertEqual(r.status_code, 200)
self.client.reset_headers()
- r = self.client.object_get('lolens', if_etag_not_match=etag+'LALALA')
- self.assertEqual(r.status_code, 200)
-
- """I believe if_un/modified_since does not work..."""
- r=self.client.object_get('lolens', if_modified_since=self.now)
+ r = self.client.object_get(obj, if_etag_not_match=etag+'LALALA')
self.assertEqual(r.status_code, 200)
self.client.reset_headers()
- r=self.client.object_get('lolens', if_unmodified_since=self.now)
- self.assertEqual(r.status_code, 200)
-
- self.client.container = ''
- self.client.reset_headers()
+ """Check and if(un)modified_since"""
+ for format in self.client.DATE_FORMATS:
+ now_formated = self.now_unformated.strftime(format)
+ self.client.reset_headers()
+ r1 = self.client.object_get(obj, if_modified_since=now_formated, success=(200, 304, 412))
+ self.client.reset_headers()
+ r2 = self.client.object_get(obj, if_unmodified_since=now_formated, success=(200, 304, 412))
+ self.client.reset_headers()
+ self.assertNotEqual(r1.status_code, r2.status_code)
+ self.client.reset_headers()
def atest_object_put(self):
- self.client.container = 'testCo0'
- obj='obj'+unicode(self.now)
+ """test object_PUT"""
+ self.client.container = self.c2
+ obj='another.test'
+
+ """create the object"""
r = self.client.object_put(obj, data='a', content_type='application/octer-stream',
permitions={'read':['accX:groupA', 'u1', 'u2'], 'write':['u2', 'u3']},
- metadata={'key1':'val1', 'key2':'val2'})
+ metadata={'key1':'val1', 'key2':'val2'}, content_encoding='UTF-8',
+ content_disposition='attachment; filename="fname.ext"')
self.assertEqual(r.status_code, 201)
+ etag = r.headers['etag']
self.client.reset_headers()
- r = self.client.object_get(obj)
+ """Check content-disposition"""
+ r = self.client.get_object_info(obj)
+ self.assertTrue(r.has_key('content-disposition'))
+
"""Check permitions"""
- perms = r.headers['x-object-sharing']
- self.assertTrue(perms.index('u1') < perms.index('write') < perms.index('u2'))
+ r = self.client.get_object_sharing(obj)
+ self.assertTrue('accx:groupa' in r['read'])
+ self.assertTrue('u1' in r['read'])
+ self.assertTrue('u2' in r['write'])
+ self.assertTrue('u3' in r['write'])
+ self.client.reset_headers()
+
"""Check metadata"""
- self.assertTrue(r.headers.has_key('x-object-meta-key1'))
- self.assertEqual(r.headers['x-object-meta-key1'], 'val1')
- self.assertTrue(r.headers.has_key('x-object-meta-key2'))
- self.assertEqual(r.headers['x-object-meta-key2'], 'val2')
- vers1 = int(r.headers['x-object-version'])
- self.assertEqual(r.headers['content-length'], '1')
- etag = r.headers['etag']
+ r = self.client.get_object_meta(obj)
+ self.assertEqual(r['key1'], 'val1')
+ self.assertEqual(r['key2'], 'val2')
self.client.reset_headers()
+ """Check public and if_etag_match"""
r = self.client.object_put(obj, if_etag_match=etag, data='b',
content_type='application/octet-stream', public=True)
self.client.reset_headers()
r = self.client.object_get(obj)
- """Check public"""
+ self.client.reset_headers()
self.assertTrue(r.headers.has_key('x-object-public'))
vers2 = int(r.headers['x-object-version'])
etag = r.headers['etag']
self.assertEqual(r.text, 'b')
self.client.reset_headers()
- r = self.client.object_put(obj, if_etag_not_match=etag, data='c', content_type='application/octet-stream', success=(201, 412))
+ """Check if_etag_not_match"""
+ r = self.client.object_put(obj, if_etag_not_match=etag, data='c',
+ content_type='application/octet-stream', success=(201, 412))
self.assertEqual(r.status_code, 412)
self.client.reset_headers()
+ """Check content_type and content_length"""
tmpdir = 'dir'+unicode(self.now)
- r = self.client.object_put(tmpdir, content_type='application/directory', content_length=0)
+ r = self.client.object_put(tmpdir, content_type='application/directory',
+ content_length=0)
self.client.reset_headers()
- r = self.client.object_get(tmpdir)
- self.assertEqual(r.headers['content-type'], 'application/directory')
+ r = self.client.get_object_info(tmpdir)
+ self.assertEqual(r['content-type'], 'application/directory')
self.client.reset_headers()
+ """Check copy_from, content_encoding"""
r = self.client.object_put('%s/%s'%(tmpdir, obj), format=None,
copy_from='/%s/%s'%(self.client.container, obj),
content_encoding='application/octet-stream',
- source_account=self.account,
+ source_account=self.client.account,
content_length=0, success=201)
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
- self.client.container = 'testCo'
- fromstr = '/testCo0/'+tmpdir+'/'+obj
+ """Check cross-container copy_from, content_encoding"""
+ self.client.container = self.c1
+ fromstr = '/'+self.c2+'/'+tmpdir+'/'+obj
r = self.client.object_put(obj, format=None, copy_from=fromstr,
content_encoding='application/octet-stream',
- source_account=self.account,
+ source_account=self.client.account,
content_length=0, success=201)
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
- r = self.client.object_get(obj)
- self.assertEqual(r.headers['etag'], etag)
+ r = self.client.get_object_info(obj)
+ self.assertEqual(r['etag'], etag)
self.client.reset_headers()
- self.client.container = 'testCo0'
- fromstr = '/testCo/'+obj
+ """Check source_account"""
+ self.client.container = self.c2
+ fromstr = '/'+self.c1+'/'+obj
r = self.client.object_put(obj+'v2', format=None, move_from=fromstr,
content_encoding='application/octet-stream',
source_account='nonExistendAddress@NeverLand.com',
self.assertEqual(r.status_code, 403)
self.client.reset_headers()
+ """Check cross-container move_from"""
r = self.client.object_put(obj+'v0', format=None,
- move_from='/testCo/'+obj,
+ move_from='/'+self.c1+'/'+obj,
content_encoding='application/octet-stream',
content_length=0, success=201)
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
- r = self.client.object_get(obj+'v0')
- self.assertEqual(r.headers['etag'], etag)
+ r = self.client.get_object_info(obj+'v0')
+ self.assertEqual(r['etag'], etag)
self.client.reset_headers()
+ """Check move_from"""
r = self.client.object_put(obj+'v1', format=None,
- move_from='/testCo0/'+obj,
+ move_from='/'+self.c2+'/'+obj,
source_version = vers2,
content_encoding='application/octet-stream',
content_length=0, success=201)
self.client.reset_headers()
- """Some problems with transfer-encoding?
- Content_disposition? manifest?"""
-
- self.client.delete_object(tmpdir+'/'+obj)
- self.client.delete_object(tmpdir)
- self.client.delete_object(obj+'v1')
- self.client.delete_object(obj+'v0')
- self.client.container = ''
+ """Check manifest"""
+ mobj = 'manifest.test'
+ txt = ''
+ for i in range(10):
+ txt += '%s'%i
+ r = self.client.object_put('%s/%s'%(mobj, i), data='%s'%i,
+ content_encoding='application/octet-stream',
+ content_length=1, success=201)
+ self.client.reset_headers()
+ self.client.object_put(mobj, content_length=0,
+ manifest='%s/%s'%(self.client.container, mobj))
+ self.client.reset_headers()
+ r = self.client.object_get(mobj)
+ self.assertEqual(r.text, txt)
self.client.reset_headers()
- def atest_object_copy(self):
- self.client.container='testCo0'
- obj = 'obj'+unicode(self.now)
+ """Some problems with transfer-encoding?"""
+
+ def test_object_copy(self):
+ """test object_COPY"""
+ self.client.container=self.c2
+ obj = 'test2'
data= '{"key1":"val1", "key2":"val2"}'
r = self.client.object_put(obj+'orig', content_type='application/octet-stream',
data= data, metadata={'mkey1':'mval1', 'mkey2':'mval2'},
- permitions={'read':['accX:groupA', 'u1', 'u2'], 'write':['u2', 'u3']})
- self.client.reset_headers()
- r = self.client.object_copy(obj+'orig', destination = '/'+self.client.container+'/'+obj, ignore_content_type=False, content_type='application/json',
+ permitions={
+ 'read':['accX:groupA', 'u1', 'u2'],
+ 'write':['u2', 'u3']},
+ content_disposition='attachment; filename="fname.ext"')
+ self.client.reset_headers()
+ r = self.client.object_copy(obj+'orig',
+ destination = '/'+self.client.container+'/'+obj,
+ ignore_content_type=False, content_type='application/json',
metadata={'mkey2':'mval2a', 'mkey3':'mval3'},
permitions={'write':['u5', 'accX:groupB']})
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
+ """Check content-disposition"""
+ r = self.client.get_object_info(obj)
+ self.assertTrue(r.has_key('content-disposition'))
+
"""Check Metadata"""
- r = self.client.object_get(obj)
- self.assertTrue(r.headers.has_key('x-object-meta-mkey1'))
- self.assertEqual(r.headers['x-object-meta-mkey1'], 'mval1')
- self.assertTrue(r.headers.has_key('x-object-meta-mkey2'))
- self.assertEqual(r.headers['x-object-meta-mkey2'], 'mval2a')
- self.assertTrue(r.headers.has_key('x-object-meta-mkey3'))
- self.assertEqual(r.headers['x-object-meta-mkey3'], 'mval3')
+ r = self.client.get_object_meta(obj)
+ self.assertEqual(r['mkey1'], 'mval1')
+ self.assertEqual(r['mkey2'], 'mval2a')
+ self.assertEqual(r['mkey3'], 'mval3')
+ self.client.reset_headers()
+
"""Check permitions"""
- self.assertFalse('read' in r.headers['x-object-sharing'])
- self.assertFalse('u2' in r.headers['x-object-sharing'])
- self.assertTrue('write' in r.headers['x-object-sharing'])
- self.assertTrue('accx:groupb' in r.headers['x-object-sharing'])
+ r = self.client.get_object_sharing(obj)
+ self.assertFalse(r.has_key('read') or 'u2' in r['write'])
+ self.assertTrue('accx:groupb' in r['write'])
self.client.reset_headers()
"""Check destination account"""
- r = self.client.object_copy(obj, destination='/testCo/'+obj, content_encoding='utf8',
+ r = self.client.object_copy(obj, destination='/%s/%s'%(self.c1,obj), content_encoding='utf8',
content_type='application/json', destination_account='nonExistendAddress@NeverLand.com',
success=(201, 403))
self.assertEqual(r.status_code, 403)
self.client.reset_headers()
- """Check destination being another container and also content_type and content encoding"""
- r = self.client.object_copy(obj, destination='/testCo/'+obj, content_encoding='utf8', content_type='application/json')
+ """Check destination being another container
+ and also content_type and content encoding"""
+ r = self.client.object_copy(obj, destination='/%s/%s'%(self.c1,obj),
+ content_encoding='utf8', content_type='application/json')
self.assertEqual(r.status_code, 201)
self.assertEqual(r.headers['content-type'], 'application/json; charset=UTF-8')
self.client.reset_headers()
- r = self.client.container='testCo'
- self.client.delete_object(obj)
- r = self.client.container='testCo0'
- self.client.reset_headers()
"""Check ignore_content_type and content_type"""
r = self.client.object_get(obj)
ctype = r.headers['content-type']
self.assertEqual(ctype, 'application/json')
self.client.reset_headers()
- r = self.client.object_copy(obj+'orig', destination = '/'+self.client.container+'/'+obj+'0',
+ r = self.client.object_copy(obj+'orig',
+ destination = '/'+self.client.container+'/'+obj+'0',
ignore_content_type=True, content_type='application/json')
self.assertEqual(r.status_code, 201)
self.assertNotEqual(r.headers['content-type'], 'application/json')
self.client.reset_headers()
"""Check if_etag_(not_)match"""
- r = self.client.object_copy(obj, destination='/'+self.client.container+'/'+obj+'1', if_etag_match=etag)
+ r = self.client.object_copy(obj,
+ destination='/'+self.client.container+'/'+obj+'1', if_etag_match=etag)
self.assertEqual(r.status_code, 201)
self.client.reset_headers()
- r = self.client.object_copy(obj, destination='/'+self.client.container+'/'+obj+'2', if_etag_not_match='lalala')
+ r = self.client.object_copy(obj,
+ destination='/'+self.client.container+'/'+obj+'2', if_etag_not_match='lalala')
self.assertEqual(r.status_code, 201)
vers2 = r.headers['x-object-version']
self.client.reset_headers()
self.assertEqual(r.status_code, 201)
self.assertTrue(r.headers['content-type'].index('xml') > 0)
self.client.reset_headers()
- r = self.client.object_get(obj+'3')
- self.assertTrue(r.headers.has_key('x-object-public'))
+ r = self.client.get_object_info(obj+'3')
+ self.assertTrue(r.has_key('x-object-public'))
self.client.reset_headers()
- """Still untested: content_disposition, manifest"""
-
- self.client.delete_object(obj)
- self.client.delete_object(obj+'0')
- self.client.delete_object(obj+'1')
- self.client.delete_object(obj+'2')
- self.client.delete_object(obj+'3')
- self.client.delete_object(obj+'orig')
- self.client.container = ''
+ """Still untested: manifest"""
def atest_object_move(self):
self.client.container='testCo0'
self.client.set_object_sharing(obj, read_permition=['u4', 'u5'], write_permition=['u4'])
self.client.reset_headers()
r = self.client.get_object_sharing(obj)
- self.assertTrue(r.has_key('x-object-sharing'))
- val = r['x-object-sharing']
- self.assertTrue(val.index('read') < val.index('u5') < val.index('write') < val.index('u4'))
+ self.assertTrue(r.has_key('read'))
+ self.assertTrue(r['read'].index('u5') >= 0)
+ self.assertTrue(r.has_key('write'))
+ self.assertTrue(r['write'].index('u4') >= 0)
self.client.reset_headers()
self.client.del_object_sharing(obj)
r = self.client.get_object_sharing(obj)
- self.assertFalse(r.has_key('x-object-sharing'))
+ self.assertTrue(len(r) == 0)
self.client.reset_headers()
"""Check publish"""
suiteFew.addTest(testPithos('test_container_get'))
suiteFew.addTest(testPithos('test_container_put'))
suiteFew.addTest(testPithos('test_container_post'))
- """
suiteFew.addTest(testPithos('test_container_delete'))
- """
suiteFew.addTest(testPithos('test_object_head'))
suiteFew.addTest(testPithos('test_object_get'))
suiteFew.addTest(testPithos('test_object_put'))
+ """
suiteFew.addTest(testPithos('test_object_copy'))
+ """
suiteFew.addTest(testPithos('test_object_move'))
suiteFew.addTest(testPithos('test_object_post'))
suiteFew.addTest(testPithos('test_object_delete'))