path = path4url(self.account)
r = self.head(path, success=(204, 401))
if r.status_code == 401:
- r.release()
raise ClientError("No authorization")
reply = r.headers
- r.release()
return reply
def replace_account_meta(self, metapairs):
for key, val in metapairs:
self.set_header('X-Account-Meta-'+key, val)
r = self.post(path, success=202)
- r.release()
def del_account_meta(self, metakey):
headers = self.get_account_info()
raise ClientError('X-Account-Meta-%s not found' % metakey, 404)
path = path4url(self.account)
r = self.post(path, success = 202)
- r.release()
def create_container(self, container):
self.assert_account()
path = path4url(self.account, container)
r = self.put(path, success=(201, 202))
if r.status_code == 202:
- r.release()
raise ClientError("Container already exists", r.status_code)
- r.release()
def get_container_info(self, container):
self.assert_account()
path = path4url(self.account, container)
r = self.head(path, success=(204, 404))
if r.status_code == 404:
- r.release()
raise ClientError("Container does not exist", r.status_code)
reply = r.headers
- r.release()
return reply
def delete_container(self, container):
path = path4url(self.account, container)
r = self.delete(path, success=(204, 404, 409))
if r.status_code == 404:
- r.release()
raise ClientError("Container does not exist", r.status_code)
elif r.status_code == 409:
- r.release()
raise ClientError("Container is not empty", r.status_code)
- r.release()
def list_containers(self):
self.assert_account()
path = path4url(self.account)
r = self.get(path, success = (200, 204))
reply = r.json
- r.release()
return reply
def upload_object(self, object, f, size=None):
path = path4url(self.account, self.container, object)
data = f.read(size) if size is not None else f.read()
r = self.put(path, data=data, success=201)
- r.release()
def create_directory(self, object):
self.assert_container()
self.set_header('Content-Type', 'application/directory')
self.set_header('Content-length', '0')
r = self.put(path, success=201)
- r.release()
def get_object_info(self, object):
self.assert_container()
path = path4url(self.account, self.container, object)
r = self.head(path, success=200)
reply = r.headers
- r.release()
return reply
def get_object_meta(self, object):
self.set_header('X-Object-Meta-'+metakey, '')
path = path4url(self.account, self.container, object)
r = self.post(path, success = 202)
- r.release()
def replace_object_meta(self, metapairs):
self.assert_container()
for key, val in metapairs:
self.set_header('X-Object-Meta-'+key, val)
r = self.post(path, success=202)
- r.release()
def get_object(self, object):
self.assert_container()
r = self.get(path, success=200)
size = int(r.headers['content-length'])
cnt = r.content
- r.release()
return cnt, size
def copy_object(self, src_container, src_object, dst_container, dst_object=False):
self.set_header('X-Copy-From', path4url(src_container, src_object))
self.set_header('Content-Length', 0)
r = self.put(dst_path, success=201)
- r.release()
def move_object(self, src_container, src_object, dst_container, dst_object=False):
self.assert_account()
self.set_header('X-Move-From', path4url(src_container, src_object))
self.set_header('Content-Length', 0)
r = self.put(dst_path, success=201)
- r.release()
def delete_object(self, object):
self.assert_container()
path = path4url(self.account, self.container, object)
r = self.delete(path, success=(204, 404))
if r.status_code == 404:
- r.release()
raise ClientError("Object %s not found" %object, r.status_code)
- r.release()
def list_objects(self):
self.assert_container()
self.set_param('format', 'json')
r = self.get(path, success=(200, 204, 304, 404), )
if r.status_code == 404:
- r.release()
raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
elif r.status_code == 304:
- r.release()
return []
reply = r.json
- r.release()
return reply
def list_objects_in_path(self, path_prefix):
self.set_param('path', 'path_prefix')
r = self.get(path, success=(200, 204, 404))
if r.status_code == 404:
- r.release()
raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
reply = r.json
- r.release()
return reply