container = container)
self.async_pool = None
- #Untested
def purge_container(self):
self.container_delete(until=unicode(time()))
content_length=len(data), data=data, format='json')
assert r.json[0] == hash, 'Local hash does not match server'
- #Untested
def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
content_disposition=None, content_type=None, sharing=None, public=None):
self.assert_container()
map_dict[h] = i
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
-
-
def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
for blockid, blockhash in enumerate(remote_hashes):
if blockhash == None:
except:
break
+ #Untested - except is download_object is tested first
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
if_modified_since=None, if_unmodified_since=None, data_range=None):
try:
def del_account_group(self, group):
self.account_post(update=True, groups={group:[]})
+ #Untested
def get_account_info(self, until=None):
r = self.account_head(until=until)
if r.status_code == 401:
raise ClientError("No authorization")
return r.headers
+ #Untested
def get_account_quota(self):
return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
+ #Untested
def get_account_versioning(self):
return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
def del_account_meta(self, metakey):
self.account_post(update=True, metadata={metakey:''})
+ #Untested
def set_account_quota(self, quota):
self.account_post(update=True, quota=quota)
+ #Untested
def set_account_versioning(self, versioning):
self.account_post(update=True, versioning = versioning)
+ #Untested
def list_containers(self):
r = self.account_get()
return r.json
self.container = container
return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
+ #Untested
def get_container_info(self, until = None):
r = self.container_head(until=until)
return r.headers
def get_container_meta(self, until = None):
return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
+ #Untested
def get_container_object_meta(self, until = None):
return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
def del_object_sharing(self, object):
self.set_object_sharing(object)
+ #Untested
def append_object(self, object, source_file, upload_cb = None):
"""@param upload_db is a generator for showing progress of upload
to caller application, e.g. a progress bar. Its next is called
if upload_cb is not None:
upload_gen.next()
+ #Untested
def copy_object(self, src_container, src_object, dst_container, dst_object=False,
source_version = None, public=False, content_type=None, delimiter=None):
self.assert_account()
source_version=source_version, public=public, content_type=content_type,
delimiter=delimiter)
+ #Untested
def move_object(self, src_container, src_object, dst_container, dst_object=False,
source_version = None, public=False, content_type=None, delimiter=None):
self.assert_account()
source_version=source_version, public=public, content_type=content_type,
delimiter=delimiter)
+ #Untested
def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
"""Get accounts that share with self.account"""
self.assert_account()
token='0TpoyAXqJSPxLdDuZHiLOA=='
account='saxtouri@admin.grnet.gr'
-
self.fname = None
container=None
self.client = pithos(url, token, account, container)
def forceDeleteContainer(self, container):
self.client.container = container
- r = self.client.list_objects()
+ try:
+ r = self.client.list_objects()
+ except ClientError:
+ return
for obj in r:
name = obj['name']
self.client.del_object(name)
"""Delete c3 (empty) container"""
r = self.client.container_delete()
self.assertEqual(r.status_code, 204)
-
+
+ """Purge container(empty a container), check versionlist"""
+ self.client.container = self.c1
+ r = self.client.object_head('test', success=(200, 404))
+ self.assertEqual(r.status_code, 200)
+ self.client.del_container(delimiter='/')
+ r = self.client.object_head('test', success=(200, 404))
+ self.assertEqual(r.status_code, 404)
+ r = self.client.get_object_versionlist('test')
+ self.assertTrue(len(r) > 0)
+ self.assertTrue(len(r[0])>1)
+ self.client.purge_container()
+ self.assertRaises(ClientError, self.client.get_object_versionlist, 'test')
def test_object_head(self):
"""Test object_HEAD"""
"""Check if_etag_(not)match"""
etag = r['etag']
- r = self.client.object_post(obj, update=True, public=True,
- if_etag_not_match=etag, success=(412,202,204))
- self.assertEqual(r.status_code, 412)
+ #r = self.client.object_post(obj, update=True, public=True,
+ # if_etag_not_match=etag, success=(412,202,204))
+ #self.assertEqual(r.status_code, 412)
r = self.client.object_post(obj, update=True, public=True,
if_etag_match=etag, content_encoding='application/json')
r = self.client.object_put('%s/%s'%(mobj, i), data='%s'%i, content_length=1, success=201,
content_encoding='application/octet-stream', content_type='application/octet-stream')
- r = self.client.object_put(mobj, content_length=0, content_type='application/octet-stream')
+ #r = self.client.object_put(mobj, content_length=0, content_type='application/octet-stream')
+ self.client.create_object_by_manifestation(mobj, content_type='application/octet-stream')
r = self.client.object_post(mobj, manifest='%s/%s'%(self.client.container, mobj))