from os import fstat
from hashlib import new as newhashlib
from time import time
+from StringIO import StringIO
from binascii import hexlify
from kamaki.clients import SilentEvent, sendlog
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
-from StringIO import StringIO
+from kamaki.clients.utils import path4url, filter_in, readall
def _pithos_hash(block, blockhash):
return h.hexdigest()
-def _range_up(start, end, a_range):
- if a_range:
- (rstart, rend) = a_range.split('-')
- (rstart, rend) = (int(rstart), int(rend))
- if rstart > end or rend < start:
- return (0, 0)
- if rstart > start:
- start = rstart
- if rend < end:
- end = rend
- return (start, end)
+def _range_up(start, end, max_value, a_range):
+ """
+ :param start: (int) the window bottom
+
+ :param end: (int) the window top
+
+ :param max_value: (int) maximum accepted value
+
+ :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+ where X: x|x-y|-x where x < y and x, y natural numbers
+
+ :returns: (str) a range string cut-off for the start-end range
+ an empty response means this window is out of range
+ """
+ assert start >= 0, '_range_up called w. start(%s) < 0' % start
+ assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+ end, start)
+ assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+ max_value, end)
+ if not a_range:
+ return '%s-%s' % (start, end)
+ selected = []
+ for some_range in a_range.split(','):
+ v0, sep, v1 = some_range.partition('-')
+ if v0:
+ v0 = int(v0)
+ if sep:
+ v1 = int(v1)
+ if v1 < start or v0 > end or v1 < v0:
+ continue
+ v0 = v0 if v0 > start else start
+ v1 = v1 if v1 < end else end
+ selected.append('%s-%s' % (v0, v1))
+ elif v0 < start:
+ continue
+ else:
+ v1 = v0 if v0 <= end else end
+ selected.append('%s-%s' % (start, v1))
+ else:
+ v1 = int(v1)
+ if max_value - v1 > end:
+ continue
+ v0 = (max_value - v1) if max_value - v1 > start else start
+ selected.append('%s-%s' % (v0, end))
+ return ','.join(selected)
class PithosClient(PithosRestClient):
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
- def purge_container(self, container=None):
- """Delete an empty container and destroy associated blocks
+ def create_container(
+ self,
+ container=None, sizelimit=None, versioning=None, metadata=None,
+ **kwargs):
+ """
+ :param container: (str) if not given, self.container is used instead
+
+ :param sizelimit: (int) container total size limit in bytes
+
+ :param versioning: (str) can be auto or whatever supported by server
+
+ :param metadata: (dict) Custom user-defined metadata of the form
+ { 'name1': 'value1', 'name2': 'value2', ... }
+
+ :returns: (dict) response headers
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
- self.container_delete(until=unicode(time()))
+ r = self.container_put(
+ quota=sizelimit, versioning=versioning, metadata=metadata,
+ **kwargs)
+ return r.headers
finally:
self.container = cnt_back_up
+ def purge_container(self, container=None):
+ """Delete an empty container and destroy associated blocks"""
+ cnt_back_up = self.container
+ try:
+ self.container = container or cnt_back_up
+ r = self.container_delete(until=unicode(time()))
+ finally:
+ self.container = cnt_back_up
+ return r.headers
+
def upload_object_unchunked(
self, obj, f,
withHashFile=False,
raise ClientError(msg, 1)
f = StringIO(data)
else:
- data = f.read(size) if size else f.read()
+ data = readall(f, size) if size else f.read()
r = self.object_put(
obj,
data=data,
return r.headers
# upload_* auxiliary methods
- def _put_block_async(self, data, hash, upload_gen=None):
+ def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
- def _create_or_get_missing_hashes(
+ def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
hash_gen = hash_cb(nblocks)
hash_gen.next()
- for i in range(nblocks):
- block = fileobj.read(min(blocksize, size - offset))
+ for i in xrange(nblocks):
+ block = readall(fileobj, min(blocksize, size - offset))
bytes = len(block)
+ if bytes <= 0:
+ break
hash = _pithos_hash(block, blockhash)
hashes.append(hash)
hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
- msg = 'Failed to calculate uploaded blocks:'
- ' Offset and object size do not match'
+ msg = ('Failed to calculate uploading blocks: '
+ 'read bytes(%s) != requested size (%s)' % (offset, size))
assert offset == size, msg
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
for hash in missing:
offset, bytes = hmap[hash]
fileobj.seek(offset)
- data = fileobj.read(bytes)
- r = self._put_block_async(data, hash, upload_gen)
+ data = readall(fileobj, bytes)
+ r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
:param public: (bool)
:param container_info_cache: (dict) if given, avoid redundant calls to
- server for container info (block size and hash information)
+ server for container info (block size and hash information)
"""
self._assert_container()
- #init
- block_info = (blocksize, blockhash, size, nblocks) =\
- self._get_file_block_info(f, size, container_info_cache)
+ block_info = (
+ blocksize, blockhash, size, nblocks) = self._get_file_block_info(
+ f, size, container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
if not content_type:
content_type = 'application/octet-stream'
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
- missing, obj_headers = self._create_or_get_missing_hashes(
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
else:
break
if missing:
+ try:
+ details = ['%s' % thread.exception for thread in missing]
+ except Exception:
+ details = ['Also, failed to read thread exceptions']
+ raise ClientError(
+ '%s blocks failed to upload' % len(missing),
+ details=details)
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ raise
+
+ r = self.object_put(
+ obj,
+ format='json',
+ hashmap=True,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ etag=etag,
+ json=hashmap,
+ permissions=sharing,
+ public=public,
+ success=201)
+ return r.headers
+
+ def upload_from_string(
+ self, obj, input_str,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ if_etag_match=None,
+ if_not_exist=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None,
+ container_info_cache=None):
+ """Upload an object using multiple connections (threads)
+
+ :param obj: (str) remote object path
+
+ :param input_str: (str) upload content
+
+ :param hash_cb: optional progress.bar object for calculating hashes
+
+ :param upload_cb: optional progress.bar object for uploading
+
+ :param etag: (str)
+
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
+ :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+ it does not exist remotely, otherwise the operation will fail.
+ Involves the case of an object with the same path is created while
+ the object is being uploaded.
+
+ :param content_encoding: (str)
+
+ :param content_disposition: (str)
+
+ :param content_type: (str)
+
+ :param sharing: {'read':[user and/or grp names],
+ 'write':[usr and/or grp names]}
+
+ :param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
+ """
+ self._assert_container()
+
+ blocksize, blockhash, size, nblocks = self._get_file_block_info(
+ fileobj=None, size=len(input_str), cache=container_info_cache)
+ (hashes, hmap, offset) = ([], {}, 0)
+ if not content_type:
+ content_type = 'application/octet-stream'
+
+ hashes = []
+ hmap = {}
+ for blockid in range(nblocks):
+ start = blockid * blocksize
+ block = input_str[start: (start + blocksize)]
+ hashes.append(_pithos_hash(block, blockhash))
+ hmap[hashes[blockid]] = (start, block)
+
+ hashmap = dict(bytes=size, hashes=hashes)
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
+ obj, hashmap,
+ content_type=content_type,
+ size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ content_encoding=content_encoding,
+ content_disposition=content_disposition,
+ permissions=sharing,
+ public=public)
+ if missing is None:
+ return obj_headers
+ num_of_missing = len(missing)
+
+ if upload_cb:
+ self.progress_bar_gen = upload_cb(nblocks)
+ for i in range(nblocks + 1 - num_of_missing):
+ self._cb_next()
+
+ tries = 7
+ old_failures = 0
+ try:
+ while tries and missing:
+ flying = []
+ failures = []
+ for hash in missing:
+ offset, block = hmap[hash]
+ bird = self._put_block_async(block, hash)
+ flying.append(bird)
+ unfinished = self._watch_thread_limit(flying)
+ for thread in set(flying).difference(unfinished):
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ if thread.isAlive():
+ flying.append(thread)
+ else:
+ self._cb_next()
+ flying = unfinished
+ for thread in flying:
+ thread.join()
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ self._cb_next()
+ missing = failures
+ if missing and len(missing) == old_failures:
+ tries -= 1
+ old_failures = len(missing)
+ if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
- status=800)
+ details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
format='json',
hashmap=True,
content_type=content_type,
+ content_encoding=content_encoding,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _dump_blocks_sync(
- self, obj, remote_hashes, blocksize, total_size, dst, range,
+ self, obj, remote_hashes, blocksize, total_size, dst, crange,
**args):
+ if not total_size:
+ return
for blockid, blockhash in enumerate(remote_hashes):
if blockhash:
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
- (start, end) = _range_up(start, end, range)
- args['data_range'] = 'bytes=%s-%s' % (start, end)
+ data_range = _range_up(start, end, total_size, crange)
+ if not data_range:
+ self._cb_next()
+ continue
+ args['data_range'] = 'bytes=%s' % data_range
r = self.object_get(obj, success=(200, 206), **args)
self._cb_next()
dst.write(r.content)
def _hash_from_file(self, fp, start, size, blockhash):
fp.seek(start)
- block = fp.read(size)
+ block = readall(fp, size)
h = newhashlib(blockhash)
h.update(block.strip('\x00'))
return hexlify(h.digest())
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
- for i, (key, g) in enumerate(flying.items()):
+ for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
flying = dict()
blockid_dict = dict()
offset = 0
- if filerange is not None:
- rstart = int(filerange.split('-')[0])
- offset = rstart if blocksize > rstart else rstart % blocksize
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
- end = total_size - 1 if key + blocksize > total_size\
- else key + blocksize - 1
- start, end = _range_up(key, end, filerange)
- if start == end:
+ end = total_size - 1 if (
+ key + blocksize > total_size) else key + blocksize - 1
+ if end < key:
+ self._cb_next()
+ continue
+ data_range = _range_up(key, end, total_size, filerange)
+ if not data_range:
self._cb_next()
continue
- restargs['async_headers'] = {
- 'Range': 'bytes=%s-%s' % (start, end)}
+ restargs[
+ 'async_headers'] = {'Range': 'bytes=%s' % data_range}
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
self._complete_cb()
+ def download_to_string(
+ self, obj,
+ download_cb=None,
+ version=None,
+ range_str=None,
+ if_match=None,
+ if_none_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None):
+ """Download an object to a string (multiple connections). This method
+ uses threads for http requests, but stores all content in memory.
+
+ :param obj: (str) remote object path
+
+ :param download_cb: optional progress.bar object for downloading
+
+ :param version: (str) file version
+
+ :param range_str: (str) from, to are file positions (int) in bytes
+
+ :param if_match: (str)
+
+ :param if_none_match: (str)
+
+ :param if_modified_since: (str) formated date
+
+ :param if_unmodified_since: (str) formated date
+
+ :returns: (str) the whole object contents
+ """
+ restargs = dict(
+ version=version,
+ data_range=None if range_str is None else 'bytes=%s' % range_str,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ (
+ blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(hash_list))
+ self._cb_next()
+
+ num_of_blocks = len(remote_hashes)
+ ret = [''] * num_of_blocks
+ self._init_thread_limit()
+ flying = dict()
+ try:
+ for blockid, blockhash in enumerate(remote_hashes):
+ start = blocksize * blockid
+ is_last = start + blocksize > total_size
+ end = (total_size - 1) if is_last else (start + blocksize - 1)
+ data_range_str = _range_up(start, end, end, range_str)
+ if data_range_str:
+ self._watch_thread_limit(flying.values())
+ restargs['data_range'] = 'bytes=%s' % data_range_str
+ flying[blockid] = self._get_block_async(obj, **restargs)
+ for runid, thread in flying.items():
+ if (blockid + 1) == num_of_blocks:
+ thread.join()
+ elif thread.isAlive():
+ continue
+ if thread.exception:
+ raise thread.exception
+ ret[runid] = thread.value.content
+ self._cb_next()
+ flying.pop(runid)
+ return ''.join(ret)
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+
#Command Progress Bar method
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
if_match=None,
if_none_match=None,
if_modified_since=None,
- if_unmodified_since=None,
- data_range=None):
+ if_unmodified_since=None):
"""
:param obj: (str) remote object path
:param if_unmodified_since: (str) formated date
- :param data_range: (str) from-to where from and to are integers
- denoting file positions in bytes
-
:returns: (list)
"""
try:
if_etag_match=if_match,
if_etag_not_match=if_none_match,
if_modified_since=if_modified_since,
- if_unmodified_since=if_unmodified_since,
- data_range=data_range)
+ if_unmodified_since=if_unmodified_since)
except ClientError as err:
if err.status == 304 or err.status == 412:
return {}
:param usernames: (list)
"""
- self.account_post(update=True, groups={group: usernames})
+ r = self.account_post(update=True, groups={group: usernames})
+ return r
def del_account_group(self, group):
"""
'X-Account-Policy-Quota',
exactMatch=True)
- def get_account_versioning(self):
- """
- :returns: (dict)
- """
- return filter_in(
- self.get_account_info(),
- 'X-Account-Policy-Versioning',
- exactMatch=True)
+ #def get_account_versioning(self):
+ # """
+ # :returns: (dict)
+ # """
+ # return filter_in(
+ # self.get_account_info(),
+ # 'X-Account-Policy-Versioning',
+ # exactMatch=True)
def get_account_meta(self, until=None):
"""
- :meta until: (str) formated date
+ :param until: (str) formated date
:returns: (dict)
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.account_post(update=True, metadata=metapairs)
+ r = self.account_post(update=True, metadata=metapairs)
+ return r.headers
def del_account_meta(self, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.account_post(update=True, metadata={metakey: ''})
+ r = self.account_post(update=True, metadata={metakey: ''})
+ return r.headers
- """
- def set_account_quota(self, quota):
- ""
- :param quota: (int)
- ""
- self.account_post(update=True, quota=quota)
- """
+ #def set_account_quota(self, quota):
+ # """
+ # :param quota: (int)
+ # """
+ # self.account_post(update=True, quota=quota)
- def set_account_versioning(self, versioning):
- """
- "param versioning: (str)
- """
- self.account_post(update=True, versioning=versioning)
+ #def set_account_versioning(self, versioning):
+ # """
+ # :param versioning: (str)
+ # """
+ # r = self.account_post(update=True, versioning=versioning)
+ # return r.headers
def list_containers(self):
"""
raise ClientError(
'Container "%s" is not empty' % self.container,
r.status_code)
+ return r.headers
def get_container_versioning(self, container=None):
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.container_post(update=True, metadata=metapairs)
+ r = self.container_post(update=True, metadata=metapairs)
+ return r.headers
def del_container_meta(self, metakey):
"""
:param metakey: (str) metadatum key
+
+ :returns: (dict) response headers
"""
- self.container_post(update=True, metadata={metakey: ''})
+ r = self.container_post(update=True, metadata={metakey: ''})
+ return r.headers
def set_container_limit(self, limit):
"""
:param limit: (int)
"""
- self.container_post(update=True, quota=limit)
+ r = self.container_post(update=True, quota=limit)
+ return r.headers
def set_container_versioning(self, versioning):
"""
:param versioning: (str)
"""
- self.container_post(update=True, versioning=versioning)
+ r = self.container_post(update=True, versioning=versioning)
+ return r.headers
def del_object(self, obj, until=None, delimiter=None):
"""
:param delimiter: (str)
"""
self._assert_container()
- self.object_delete(obj, until=until, delimiter=delimiter)
+ r = self.object_delete(obj, until=until, delimiter=delimiter)
+ return r.headers
def set_object_meta(self, obj, metapairs):
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.object_post(obj, update=True, metadata=metapairs)
+ r = self.object_post(obj, update=True, metadata=metapairs)
+ return r.headers
def del_object_meta(self, obj, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.object_post(obj, update=True, metadata={metakey: ''})
+ r = self.object_post(obj, update=True, metadata={metakey: ''})
+ return r.headers
def publish_object(self, obj):
"""
"""
self.object_post(obj, update=True, public=True)
info = self.get_object_info(obj)
+ return info['x-object-public']
pref, sep, rest = self.base_url.partition('//')
base = rest.split('/')[0]
return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
"""
:param obj: (str) remote object path
"""
- self.object_post(obj, update=True, public=False)
+ r = self.object_post(obj, update=True, public=False)
+ return r.headers
def get_object_info(self, obj, version=None):
"""
def set_object_sharing(
self, obj,
- read_permition=False, write_permition=False):
+ read_permission=False, write_permission=False):
"""Give read/write permisions to an object.
:param obj: (str) remote object path
- :param read_permition: (list - bool) users and user groups that get
- read permition for this object - False means all previous read
+ :param read_permission: (list - bool) users and user groups that get
+ read permission for this object - False means all previous read
permissions will be removed
- :param write_perimition: (list - bool) of users and user groups to get
- write permition for this object - False means all previous write
+ :param write_permission: (list - bool) of users and user groups to get
+ write permission for this object - False means all previous write
permissions will be removed
+
+ :returns: (dict) response headers
"""
- perms = dict(read=read_permition or '', write=write_permition or '')
- self.object_post(obj, update=True, permissions=perms)
+ perms = dict(read=read_permission or '', write=write_permission or '')
+ r = self.object_post(obj, update=True, permissions=perms)
+ return r.headers
def del_object_sharing(self, obj):
"""
:param obj: (str) remote object path
"""
- self.set_object_sharing(obj)
+ return self.set_object_sharing(obj)
def append_object(self, obj, source_file, upload_cb=None):
"""
:param upload_db: progress.bar for uploading
"""
-
self._assert_container()
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
nblocks = 1 + (filesize - 1) // blocksize
offset = 0
+ headers = {}
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
- for i in range(nblocks):
- block = source_file.read(min(blocksize, filesize - offset))
- offset += len(block)
- self.object_post(
- obj,
- update=True,
- content_range='bytes */*',
- content_type='application/octet-stream',
- content_length=len(block),
- data=block)
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ flying = {}
+ self._init_thread_limit()
+ try:
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset))
+ offset += len(block)
- if upload_cb:
- upload_gen.next()
+ self._watch_thread_limit(flying.values())
+ unfinished = {}
+ flying[i] = SilentEvent(
+ method=self.object_post,
+ obj=obj,
+ update=True,
+ content_range='bytes */*',
+ content_type='application/octet-stream',
+ content_length=len(block),
+ data=block)
+ flying[i].start()
+
+ for key, thread in flying.items():
+ if thread.isAlive():
+ if i < nblocks:
+ unfinished[key] = thread
+ continue
+ thread.join()
+ if thread.exception:
+ raise thread.exception
+ headers[key] = thread.value.headers
+ self._cb_next()
+ flying = unfinished
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ finally:
+ from time import sleep
+ sleep(2 * len(activethreads()))
+ return headers.values()
def truncate_object(self, obj, upto_bytes):
"""
:param obj: (str) remote object path
:param upto_bytes: max number of bytes to leave on file
+
+ :returns: (dict) response headers
"""
- self.object_post(
+ r = self.object_post(
obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
content_type='application/octet-stream',
object_bytes=upto_bytes,
source_object=path4url(self.container, obj))
+ return r.headers
def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
"""Overwrite a part of an object from local source file
nblocks = 1 + (datasize - 1) // blocksize
offset = 0
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ headers = []
for i in range(nblocks):
read_size = min(blocksize, filesize - offset, datasize - offset)
block = source_file.read(read_size)
- self.object_post(
+ r = self.object_post(
obj,
update=True,
content_type='application/octet-stream',
start + offset,
start + offset + len(block) - 1),
data=block)
+ headers.append(dict(r.headers))
offset += len(block)
- if upload_cb:
- upload_gen.next()
+ self._cb_next
+ return headers
def copy_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object or src_object,
success=201,
copy_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def move_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object,
success=201,
move_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
"""Get accounts that share with self.account