Update copyright dates for changes files
[kamaki] / kamaki / clients / pithos / __init__.py
index 6251e0f..4eb7153 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
@@ -43,7 +43,7 @@ from binascii import hexlify
 from kamaki.clients import SilentEvent, sendlog
 from kamaki.clients.pithos.rest_api import PithosRestClient
 from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
+from kamaki.clients.utils import path4url, filter_in, readall
 
 
 def _pithos_hash(block, blockhash):
@@ -52,17 +52,51 @@ def _pithos_hash(block, blockhash):
     return h.hexdigest()
 
 
-def _range_up(start, end, a_range):
-    if a_range:
-        (rstart, rend) = a_range.split('-')
-        (rstart, rend) = (int(rstart), int(rend))
-        if rstart > end or rend < start:
-            return (0, 0)
-        if rstart > start:
-            start = rstart
-        if rend < end:
-            end = rend
-    return (start, end)
+def _range_up(start, end, max_value, a_range):
+    """
+    :param start: (int) the window bottom
+
+    :param end: (int) the window top
+
+    :param max_value: (int) maximum accepted value
+
+    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+        where X: x|x-y|-x where x < y and x, y natural numbers
+
+    :returns: (str) a range string cut-off for the start-end range
+        an empty response means this window is out of range
+    """
+    assert start >= 0, '_range_up called w. start(%s) < 0' % start
+    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+        end, start)
+    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+        max_value, end)
+    if not a_range:
+        return '%s-%s' % (start, end)
+    selected = []
+    for some_range in a_range.split(','):
+        v0, sep, v1 = some_range.partition('-')
+        if v0:
+            v0 = int(v0)
+            if sep:
+                v1 = int(v1)
+                if v1 < start or v0 > end or v1 < v0:
+                    continue
+                v0 = v0 if v0 > start else start
+                v1 = v1 if v1 < end else end
+                selected.append('%s-%s' % (v0, v1))
+            elif v0 < start:
+                continue
+            else:
+                v1 = v0 if v0 <= end else end
+                selected.append('%s-%s' % (start, v1))
+        else:
+            v1 = int(v1)
+            if max_value - v1 > end:
+                continue
+            v0 = (max_value - v1) if max_value - v1 > start else start
+            selected.append('%s-%s' % (v0, end))
+    return ','.join(selected)
 
 
 class PithosClient(PithosRestClient):
@@ -71,16 +105,42 @@ class PithosClient(PithosRestClient):
     def __init__(self, base_url, token, account=None, container=None):
         super(PithosClient, self).__init__(base_url, token, account, container)
 
-    def purge_container(self, container=None):
-        """Delete an empty container and destroy associated blocks
+    def create_container(
+            self,
+            container=None, sizelimit=None, versioning=None, metadata=None,
+            **kwargs):
+        """
+        :param container: (str) if not given, self.container is used instead
+
+        :param sizelimit: (int) container total size limit in bytes
+
+        :param versioning: (str) can be auto or whatever supported by server
+
+        :param metadata: (dict) Custom user-defined metadata of the form
+            { 'name1': 'value1', 'name2': 'value2', ... }
+
+        :returns: (dict) response headers
         """
         cnt_back_up = self.container
         try:
             self.container = container or cnt_back_up
-            self.container_delete(until=unicode(time()))
+            r = self.container_put(
+                quota=sizelimit, versioning=versioning, metadata=metadata,
+                **kwargs)
+            return r.headers
         finally:
             self.container = cnt_back_up
 
+    def purge_container(self, container=None):
+        """Delete an empty container and destroy associated blocks"""
+        cnt_back_up = self.container
+        try:
+            self.container = container or cnt_back_up
+            r = self.container_delete(until=unicode(time()))
+        finally:
+            self.container = cnt_back_up
+        return r.headers
+
     def upload_object_unchunked(
             self, obj, f,
             withHashFile=False,
@@ -129,7 +189,7 @@ class PithosClient(PithosRestClient):
                 raise ClientError(msg, 1)
             f = StringIO(data)
         else:
-            data = f.read(size) if size else f.read()
+            data = readall(f, size) if size else f.read()
         r = self.object_put(
             obj,
             data=data,
@@ -255,17 +315,19 @@ class PithosClient(PithosRestClient):
             hash_gen = hash_cb(nblocks)
             hash_gen.next()
 
-        for i in range(nblocks):
-            block = fileobj.read(min(blocksize, size - offset))
+        for i in xrange(nblocks):
+            block = readall(fileobj, min(blocksize, size - offset))
             bytes = len(block)
+            if bytes <= 0:
+                break
             hash = _pithos_hash(block, blockhash)
             hashes.append(hash)
             hmap[hash] = (offset, bytes)
             offset += bytes
             if hash_cb:
                 hash_gen.next()
-        msg = 'Failed to calculate uploaded blocks:'
-        ' Offset and object size do not match'
+        msg = ('Failed to calculate uploading blocks: '
+               'read bytes(%s) != requested size (%s)' % (offset, size))
         assert offset == size, msg
 
     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
@@ -278,7 +340,7 @@ class PithosClient(PithosRestClient):
         for hash in missing:
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
-            data = fileobj.read(bytes)
+            data = readall(fileobj, bytes)
             r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
@@ -356,7 +418,7 @@ class PithosClient(PithosRestClient):
         :param public: (bool)
 
         :param container_info_cache: (dict) if given, avoid redundant calls to
-        server for container info (block size and hash information)
+            server for container info (block size and hash information)
         """
         self._assert_container()
 
@@ -364,8 +426,7 @@ class PithosClient(PithosRestClient):
             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
                 f, size, container_info_cache)
         (hashes, hmap, offset) = ([], {}, 0)
-        if not content_type:
-            content_type = 'application/octet-stream'
+        content_type = content_type or 'application/octet-stream'
 
         self._calculate_blocks_for_upload(
             *block_info,
@@ -405,10 +466,7 @@ class PithosClient(PithosRestClient):
                 sendlog.info('%s blocks missing' % len(missing))
                 num_of_blocks = len(missing)
                 missing = self._upload_missing_blocks(
-                    missing,
-                    hmap,
-                    f,
-                    upload_gen)
+                    missing, hmap, f, upload_gen)
                 if missing:
                     if num_of_blocks == len(missing):
                         retries -= 1
@@ -417,9 +475,13 @@ class PithosClient(PithosRestClient):
                 else:
                     break
             if missing:
+                try:
+                    details = ['%s' % thread.exception for thread in missing]
+                except Exception:
+                    details = ['Also, failed to read thread exceptions']
                 raise ClientError(
                     '%s blocks failed to upload' % len(missing),
-                    details=['%s' % thread.exception for thread in missing])
+                    details=details)
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
@@ -431,6 +493,7 @@ class PithosClient(PithosRestClient):
             format='json',
             hashmap=True,
             content_type=content_type,
+            content_encoding=content_encoding,
             if_etag_match=if_etag_match,
             if_etag_not_match='*' if if_not_exist else None,
             etag=etag,
@@ -485,7 +548,7 @@ class PithosClient(PithosRestClient):
         :param public: (bool)
 
         :param container_info_cache: (dict) if given, avoid redundant calls to
-        server for container info (block size and hash information)
+            server for container info (block size and hash information)
         """
         self._assert_container()
 
@@ -552,20 +615,20 @@ class PithosClient(PithosRestClient):
                     tries -= 1
                 old_failures = len(missing)
             if missing:
-                raise ClientError(
-                    '%s blocks failed to upload' % len(missing),
-                    details=['%s' % thread.exception for thread in missing])
+                raise ClientError('%s blocks failed to upload' % len(missing))
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
                 thread.join()
             raise
+        self._cb_next()
 
         r = self.object_put(
             obj,
             format='json',
             hashmap=True,
             content_type=content_type,
+            content_encoding=content_encoding,
             if_etag_match=if_etag_match,
             if_etag_not_match='*' if if_not_exist else None,
             etag=etag,
@@ -595,15 +658,20 @@ class PithosClient(PithosRestClient):
         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
     def _dump_blocks_sync(
-            self, obj, remote_hashes, blocksize, total_size, dst, range,
+            self, obj, remote_hashes, blocksize, total_size, dst, crange,
             **args):
+        if not total_size:
+            return
         for blockid, blockhash in enumerate(remote_hashes):
             if blockhash:
                 start = blocksize * blockid
                 is_last = start + blocksize > total_size
                 end = (total_size - 1) if is_last else (start + blocksize - 1)
-                (start, end) = _range_up(start, end, range)
-                args['data_range'] = 'bytes=%s-%s' % (start, end)
+                data_range = _range_up(start, end, total_size, crange)
+                if not data_range:
+                    self._cb_next()
+                    continue
+                args['data_range'] = 'bytes=%s' % data_range
                 r = self.object_get(obj, success=(200, 206), **args)
                 self._cb_next()
                 dst.write(r.content)
@@ -616,7 +684,7 @@ class PithosClient(PithosRestClient):
 
     def _hash_from_file(self, fp, start, size, blockhash):
         fp.seek(start)
-        block = fp.read(size)
+        block = readall(fp, size)
         h = newhashlib(blockhash)
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
@@ -649,9 +717,6 @@ class PithosClient(PithosRestClient):
         flying = dict()
         blockid_dict = dict()
         offset = 0
-        if filerange is not None:
-            rstart = int(filerange.split('-')[0])
-            offset = rstart if blocksize > rstart else rstart % blocksize
 
         self._init_thread_limit()
         for block_hash, blockids in remote_hashes.items():
@@ -668,12 +733,15 @@ class PithosClient(PithosRestClient):
                     **restargs)
                 end = total_size - 1 if (
                     key + blocksize > total_size) else key + blocksize - 1
-                start, end = _range_up(key, end, filerange)
-                if start == end:
+                if end < key:
+                    self._cb_next()
+                    continue
+                data_range = _range_up(key, end, total_size, filerange)
+                if not data_range:
                     self._cb_next()
                     continue
-                restargs['async_headers'] = {
-                    'Range': 'bytes=%s-%s' % (start, end)}
+                restargs[
+                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
                 flying[key] = self._get_block_async(obj, **restargs)
                 blockid_dict[key] = unsaved
 
@@ -811,25 +879,31 @@ class PithosClient(PithosRestClient):
         ret = [''] * num_of_blocks
         self._init_thread_limit()
         flying = dict()
-        for blockid, blockhash in enumerate(remote_hashes):
-            start = blocksize * blockid
-            is_last = start + blocksize > total_size
-            end = (total_size - 1) if is_last else (start + blocksize - 1)
-            (start, end) = _range_up(start, end, range_str)
-            if start < end:
-                self._watch_thread_limit(flying.values())
-                flying[blockid] = self._get_block_async(obj, **restargs)
-            for runid, thread in flying.items():
-                if (blockid + 1) == num_of_blocks:
-                    thread.join()
-                elif thread.isAlive():
-                    continue
-                if thread.exception:
-                    raise thread.exception
-                ret[runid] = thread.value.content
-                self._cb_next()
-                flying.pop(runid)
-        return ''.join(ret)
+        try:
+            for blockid, blockhash in enumerate(remote_hashes):
+                start = blocksize * blockid
+                is_last = start + blocksize > total_size
+                end = (total_size - 1) if is_last else (start + blocksize - 1)
+                data_range_str = _range_up(start, end, end, range_str)
+                if data_range_str:
+                    self._watch_thread_limit(flying.values())
+                    restargs['data_range'] = 'bytes=%s' % data_range_str
+                    flying[blockid] = self._get_block_async(obj, **restargs)
+                for runid, thread in flying.items():
+                    if (blockid + 1) == num_of_blocks:
+                        thread.join()
+                    elif thread.isAlive():
+                        continue
+                    if thread.exception:
+                        raise thread.exception
+                    ret[runid] = thread.value.content
+                    self._cb_next()
+                    flying.pop(runid)
+            return ''.join(ret)
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
 
     #Command Progress Bar method
     def _cb_next(self, step=1):
@@ -853,8 +927,7 @@ class PithosClient(PithosRestClient):
             if_match=None,
             if_none_match=None,
             if_modified_since=None,
-            if_unmodified_since=None,
-            data_range=None):
+            if_unmodified_since=None):
         """
         :param obj: (str) remote object path
 
@@ -866,9 +939,6 @@ class PithosClient(PithosRestClient):
 
         :param if_unmodified_since: (str) formated date
 
-        :param data_range: (str) from-to where from and to are integers
-            denoting file positions in bytes
-
         :returns: (list)
         """
         try:
@@ -879,8 +949,7 @@ class PithosClient(PithosRestClient):
                 if_etag_match=if_match,
                 if_etag_not_match=if_none_match,
                 if_modified_since=if_modified_since,
-                if_unmodified_since=if_unmodified_since,
-                data_range=data_range)
+                if_unmodified_since=if_unmodified_since)
         except ClientError as err:
             if err.status == 304 or err.status == 412:
                 return {}
@@ -893,7 +962,8 @@ class PithosClient(PithosRestClient):
 
         :param usernames: (list)
         """
-        self.account_post(update=True, groups={group: usernames})
+        r = self.account_post(update=True, groups={group: usernames})
+        return r
 
     def del_account_group(self, group):
         """
@@ -921,18 +991,18 @@ class PithosClient(PithosRestClient):
             'X-Account-Policy-Quota',
             exactMatch=True)
 
-    def get_account_versioning(self):
-        """
-        :returns: (dict)
-        """
-        return filter_in(
-            self.get_account_info(),
-            'X-Account-Policy-Versioning',
-            exactMatch=True)
+    #def get_account_versioning(self):
+    #    """
+    #    :returns: (dict)
+    #    """
+    #    return filter_in(
+    #        self.get_account_info(),
+    #        'X-Account-Policy-Versioning',
+    #        exactMatch=True)
 
     def get_account_meta(self, until=None):
         """
-        :meta until: (str) formated date
+        :param until: (str) formated date
 
         :returns: (dict)
         """
@@ -949,27 +1019,28 @@ class PithosClient(PithosRestClient):
         :param metapairs: (dict) {key1:val1, key2:val2, ...}
         """
         assert(type(metapairs) is dict)
-        self.account_post(update=True, metadata=metapairs)
+        r = self.account_post(update=True, metadata=metapairs)
+        return r.headers
 
     def del_account_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
         """
-        self.account_post(update=True, metadata={metakey: ''})
+        r = self.account_post(update=True, metadata={metakey: ''})
+        return r.headers
 
-    """
-    def set_account_quota(self, quota):
-        ""
-        :param quota: (int)
-        ""
-        self.account_post(update=True, quota=quota)
-    """
+    #def set_account_quota(self, quota):
+    #    """
+    #    :param quota: (int)
+    #    """
+    #    self.account_post(update=True, quota=quota)
 
-    def set_account_versioning(self, versioning):
-        """
-        "param versioning: (str)
-        """
-        self.account_post(update=True, versioning=versioning)
+    #def set_account_versioning(self, versioning):
+    #    """
+    #    :param versioning: (str)
+    #    """
+    #    r = self.account_post(update=True, versioning=versioning)
+    #    return r.headers
 
     def list_containers(self):
         """
@@ -1001,6 +1072,7 @@ class PithosClient(PithosRestClient):
             raise ClientError(
                 'Container "%s" is not empty' % self.container,
                 r.status_code)
+        return r.headers
 
     def get_container_versioning(self, container=None):
         """
@@ -1032,7 +1104,7 @@ class PithosClient(PithosRestClient):
         finally:
             self.container = cnt_back_up
 
-    def get_container_info(self, until=None):
+    def get_container_info(self, container=None, until=None):
         """
         :param until: (str) formated date
 
@@ -1040,11 +1112,16 @@ class PithosClient(PithosRestClient):
 
         :raises ClientError: 404 Container not found
         """
+        bck_cont = self.container
         try:
+            self.container = container or bck_cont
+            self._assert_container()
             r = self.container_head(until=until)
         except ClientError as err:
             err.details.append('for container %s' % self.container)
             raise err
+        finally:
+            self.container = bck_cont
         return r.headers
 
     def get_container_meta(self, until=None):
@@ -1054,8 +1131,7 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Meta')
+            self.get_container_info(until=until), 'X-Container-Meta')
 
     def get_container_object_meta(self, until=None):
         """
@@ -1064,33 +1140,38 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Object-Meta')
+            self.get_container_info(until=until), 'X-Container-Object-Meta')
 
     def set_container_meta(self, metapairs):
         """
         :param metapairs: (dict) {key1:val1, key2:val2, ...}
         """
         assert(type(metapairs) is dict)
-        self.container_post(update=True, metadata=metapairs)
+        r = self.container_post(update=True, metadata=metapairs)
+        return r.headers
 
     def del_container_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
+
+        :returns: (dict) response headers
         """
-        self.container_post(update=True, metadata={metakey: ''})
+        r = self.container_post(update=True, metadata={metakey: ''})
+        return r.headers
 
     def set_container_limit(self, limit):
         """
         :param limit: (int)
         """
-        self.container_post(update=True, quota=limit)
+        r = self.container_post(update=True, quota=limit)
+        return r.headers
 
     def set_container_versioning(self, versioning):
         """
         :param versioning: (str)
         """
-        self.container_post(update=True, versioning=versioning)
+        r = self.container_post(update=True, versioning=versioning)
+        return r.headers
 
     def del_object(self, obj, until=None, delimiter=None):
         """
@@ -1101,7 +1182,8 @@ class PithosClient(PithosRestClient):
         :param delimiter: (str)
         """
         self._assert_container()
-        self.object_delete(obj, until=until, delimiter=delimiter)
+        r = self.object_delete(obj, until=until, delimiter=delimiter)
+        return r.headers
 
     def set_object_meta(self, obj, metapairs):
         """
@@ -1110,7 +1192,8 @@ class PithosClient(PithosRestClient):
         :param metapairs: (dict) {key1:val1, key2:val2, ...}
         """
         assert(type(metapairs) is dict)
-        self.object_post(obj, update=True, metadata=metapairs)
+        r = self.object_post(obj, update=True, metadata=metapairs)
+        return r.headers
 
     def del_object_meta(self, obj, metakey):
         """
@@ -1118,7 +1201,8 @@ class PithosClient(PithosRestClient):
 
         :param metakey: (str) metadatum key
         """
-        self.object_post(obj, update=True, metadata={metakey: ''})
+        r = self.object_post(obj, update=True, metadata={metakey: ''})
+        return r.headers
 
     def publish_object(self, obj):
         """
@@ -1128,6 +1212,7 @@ class PithosClient(PithosRestClient):
         """
         self.object_post(obj, update=True, public=True)
         info = self.get_object_info(obj)
+        return info['x-object-public']
         pref, sep, rest = self.base_url.partition('//')
         base = rest.split('/')[0]
         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
@@ -1136,7 +1221,8 @@ class PithosClient(PithosRestClient):
         """
         :param obj: (str) remote object path
         """
-        self.object_post(obj, update=True, public=False)
+        r = self.object_post(obj, update=True, public=False)
+        return r.headers
 
     def get_object_info(self, obj, version=None):
         """
@@ -1190,28 +1276,31 @@ class PithosClient(PithosRestClient):
 
     def set_object_sharing(
             self, obj,
-            read_permition=False, write_permition=False):
+            read_permission=False, write_permission=False):
         """Give read/write permisions to an object.
 
         :param obj: (str) remote object path
 
-        :param read_permition: (list - bool) users and user groups that get
-            read permition for this object - False means all previous read
+        :param read_permission: (list - bool) users and user groups that get
+            read permission for this object - False means all previous read
             permissions will be removed
 
-        :param write_perimition: (list - bool) of users and user groups to get
-           write permition for this object - False means all previous write
+        :param write_permission: (list - bool) of users and user groups to get
+           write permission for this object - False means all previous write
            permissions will be removed
+
+        :returns: (dict) response headers
         """
 
-        perms = dict(read=read_permition or '', write=write_permition or '')
-        self.object_post(obj, update=True, permissions=perms)
+        perms = dict(read=read_permission or '', write=write_permission or '')
+        r = self.object_post(obj, update=True, permissions=perms)
+        return r.headers
 
     def del_object_sharing(self, obj):
         """
         :param obj: (str) remote object path
         """
-        self.set_object_sharing(obj)
+        return self.set_object_sharing(obj)
 
     def append_object(self, obj, source_file, upload_cb=None):
         """
@@ -1221,46 +1310,79 @@ class PithosClient(PithosRestClient):
 
         :param upload_db: progress.bar for uploading
         """
-
         self._assert_container()
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
         nblocks = 1 + (filesize - 1) // blocksize
         offset = 0
+        headers = {}
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
-        for i in range(nblocks):
-            block = source_file.read(min(blocksize, filesize - offset))
-            offset += len(block)
-            self.object_post(
-                obj,
-                update=True,
-                content_range='bytes */*',
-                content_type='application/octet-stream',
-                content_length=len(block),
-                data=block)
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        flying = {}
+        self._init_thread_limit()
+        try:
+            for i in range(nblocks):
+                block = source_file.read(min(blocksize, filesize - offset))
+                offset += len(block)
 
-            if upload_cb:
-                upload_gen.next()
+                self._watch_thread_limit(flying.values())
+                unfinished = {}
+                flying[i] = SilentEvent(
+                    method=self.object_post,
+                    obj=obj,
+                    update=True,
+                    content_range='bytes */*',
+                    content_type='application/octet-stream',
+                    content_length=len(block),
+                    data=block)
+                flying[i].start()
+
+                for key, thread in flying.items():
+                    if thread.isAlive():
+                        if i < nblocks:
+                            unfinished[key] = thread
+                            continue
+                        thread.join()
+                    if thread.exception:
+                        raise thread.exception
+                    headers[key] = thread.value.headers
+                    self._cb_next()
+                flying = unfinished
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+        finally:
+            from time import sleep
+            sleep(2 * len(activethreads()))
+            self._cb_next()
+        return headers.values()
 
     def truncate_object(self, obj, upto_bytes):
         """
         :param obj: (str) remote object path
 
         :param upto_bytes: max number of bytes to leave on file
+
+        :returns: (dict) response headers
         """
-        self.object_post(
+        ctype = self.get_object_info(obj)['content-type']
+        r = self.object_post(
             obj,
             update=True,
             content_range='bytes 0-%s/*' % upto_bytes,
-            content_type='application/octet-stream',
+            content_type=ctype,
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
+        return r.headers
 
-    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+    def overwrite_object(
+            self, obj, start, end, source_file,
+            source_version=None, upload_cb=None):
         """Overwrite a part of an object from local source file
+        ATTENTION: content_type must always be application/octet-stream
 
         :param obj: (str) remote object path
 
@@ -1273,30 +1395,26 @@ class PithosClient(PithosRestClient):
         :param upload_db: progress.bar for uploading
         """
 
-        r = self.get_object_info(obj)
-        rf_size = int(r['content-length'])
-        if rf_size < int(start):
-            raise ClientError(
-                'Range start exceeds file size',
-                status=416)
-        elif rf_size < int(end):
-            raise ClientError(
-                'Range end exceeds file size',
-                status=416)
         self._assert_container()
+        r = self.get_object_info(obj, version=source_version)
+        rf_size = int(r['content-length'])
+        start, end = int(start), int(end)
+        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+            start, rf_size)
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
-        datasize = int(end) - int(start) + 1
+        datasize = end - start + 1
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        headers = []
         for i in range(nblocks):
             read_size = min(blocksize, filesize - offset, datasize - offset)
             block = source_file.read(read_size)
-            self.object_post(
+            r = self.object_post(
                 obj,
                 update=True,
                 content_type='application/octet-stream',
@@ -1304,11 +1422,13 @@ class PithosClient(PithosRestClient):
                 content_range='bytes %s-%s/*' % (
                     start + offset,
                     start + offset + len(block) - 1),
+                source_version=source_version,
                 data=block)
+            headers.append(dict(r.headers))
             offset += len(block)
-
-            if upload_cb:
-                upload_gen.next()
+            self._cb_next()
+        self._cb_next()
+        return headers
 
     def copy_object(
             self, src_container, src_object, dst_container,
@@ -1336,11 +1456,13 @@ class PithosClient(PithosRestClient):
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
         src_path = path4url(src_container, src_object)
-        self.object_put(
+        r = self.object_put(
             dst_object or src_object,
             success=201,
             copy_from=src_path,
@@ -1350,6 +1472,7 @@ class PithosClient(PithosRestClient):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
+        return r.headers
 
     def move_object(
             self, src_container, src_object, dst_container,
@@ -1377,12 +1500,14 @@ class PithosClient(PithosRestClient):
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
         dst_object = dst_object or src_object
         src_path = path4url(src_container, src_object)
-        self.object_put(
+        r = self.object_put(
             dst_object,
             success=201,
             move_from=src_path,
@@ -1392,6 +1517,7 @@ class PithosClient(PithosRestClient):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
+        return r.headers
 
     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
         """Get accounts that share with self.account