Update copyright dates for changes files
[kamaki] / kamaki / clients / pithos / __init__.py
index 0cc0cd5..4eb7153 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
@@ -36,14 +36,14 @@ from threading import enumerate as activethreads
 from os import fstat
 from hashlib import new as newhashlib
 from time import time
+from StringIO import StringIO
 
 from binascii import hexlify
 
 from kamaki.clients import SilentEvent, sendlog
-from kamaki.clients.pithos_rest_api import PithosRestAPI
+from kamaki.clients.pithos.rest_api import PithosRestClient
 from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
-from StringIO import StringIO
+from kamaki.clients.utils import path4url, filter_in, readall
 
 
 def _pithos_hash(block, blockhash):
@@ -52,37 +52,94 @@ def _pithos_hash(block, blockhash):
     return h.hexdigest()
 
 
-def _range_up(start, end, a_range):
-    if a_range:
-        (rstart, rend) = a_range.split('-')
-        (rstart, rend) = (int(rstart), int(rend))
-        if rstart > end or rend < start:
-            return (0, 0)
-        if rstart > start:
-            start = rstart
-        if rend < end:
-            end = rend
-    return (start, end)
-
+def _range_up(start, end, max_value, a_range):
+    """
+    :param start: (int) the window bottom
+
+    :param end: (int) the window top
+
+    :param max_value: (int) maximum accepted value
+
+    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+        where X: x|x-y|-x where x < y and x, y natural numbers
+
+    :returns: (str) a range string cut-off for the start-end range
+        an empty response means this window is out of range
+    """
+    assert start >= 0, '_range_up called w. start(%s) < 0' % start
+    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+        end, start)
+    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+        max_value, end)
+    if not a_range:
+        return '%s-%s' % (start, end)
+    selected = []
+    for some_range in a_range.split(','):
+        v0, sep, v1 = some_range.partition('-')
+        if v0:
+            v0 = int(v0)
+            if sep:
+                v1 = int(v1)
+                if v1 < start or v0 > end or v1 < v0:
+                    continue
+                v0 = v0 if v0 > start else start
+                v1 = v1 if v1 < end else end
+                selected.append('%s-%s' % (v0, v1))
+            elif v0 < start:
+                continue
+            else:
+                v1 = v0 if v0 <= end else end
+                selected.append('%s-%s' % (start, v1))
+        else:
+            v1 = int(v1)
+            if max_value - v1 > end:
+                continue
+            v0 = (max_value - v1) if max_value - v1 > start else start
+            selected.append('%s-%s' % (v0, end))
+    return ','.join(selected)
 
-class PithosClient(PithosRestAPI):
-    """GRNet Pithos API client"""
 
-    _thread_exceptions = []
+class PithosClient(PithosRestClient):
+    """Synnefo Pithos+ API client"""
 
     def __init__(self, base_url, token, account=None, container=None):
         super(PithosClient, self).__init__(base_url, token, account, container)
 
-    def purge_container(self, container=None):
-        """Delete an empty container and destroy associated blocks
+    def create_container(
+            self,
+            container=None, sizelimit=None, versioning=None, metadata=None,
+            **kwargs):
+        """
+        :param container: (str) if not given, self.container is used instead
+
+        :param sizelimit: (int) container total size limit in bytes
+
+        :param versioning: (str) can be auto or whatever supported by server
+
+        :param metadata: (dict) Custom user-defined metadata of the form
+            { 'name1': 'value1', 'name2': 'value2', ... }
+
+        :returns: (dict) response headers
         """
         cnt_back_up = self.container
         try:
             self.container = container or cnt_back_up
+            r = self.container_put(
+                quota=sizelimit, versioning=versioning, metadata=metadata,
+                **kwargs)
+            return r.headers
+        finally:
+            self.container = cnt_back_up
+
+    def purge_container(self, container=None):
+        """Delete an empty container and destroy associated blocks"""
+        cnt_back_up = self.container
+        try:
+            self.container = container or cnt_back_up
             r = self.container_delete(until=unicode(time()))
         finally:
             self.container = cnt_back_up
-        r.release()
+        return r.headers
 
     def upload_object_unchunked(
             self, obj, f,
@@ -115,6 +172,8 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :returns: (dict) created object metadata
         """
         self._assert_container()
 
@@ -130,7 +189,7 @@ class PithosClient(PithosRestAPI):
                 raise ClientError(msg, 1)
             f = StringIO(data)
         else:
-            data = f.read(size) if size else f.read()
+            data = readall(f, size) if size else f.read()
         r = self.object_put(
             obj,
             data=data,
@@ -141,7 +200,7 @@ class PithosClient(PithosRestAPI):
             permissions=sharing,
             public=public,
             success=201)
-        r.release()
+        return r.headers
 
     def create_object_by_manifestation(
             self, obj,
@@ -166,6 +225,8 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :returns: (dict) created object metadata
         """
         self._assert_container()
         r = self.object_put(
@@ -178,10 +239,10 @@ class PithosClient(PithosRestAPI):
             permissions=sharing,
             public=public,
             manifest='%s/%s' % (self.container, obj))
-        r.release()
+        return r.headers
 
     # upload_* auxiliary methods
-    def _put_block_async(self, data, hash, upload_gen=None):
+    def _put_block_async(self, data, hash):
         event = SilentEvent(method=self._put_block, data=data, hash=hash)
         event.start()
         return event
@@ -195,21 +256,37 @@ class PithosClient(PithosRestAPI):
             format='json')
         assert r.json[0] == hash, 'Local hash does not match server'
 
-    def _get_file_block_info(self, fileobj, size=None):
-        meta = self.get_container_info()
+    def _get_file_block_info(self, fileobj, size=None, cache=None):
+        """
+        :param fileobj: (file descriptor) source
+
+        :param size: (int) size of data to upload from source
+
+        :param cache: (dict) if provided, cache container info response to
+        avoid redundant calls
+        """
+        if isinstance(cache, dict):
+            try:
+                meta = cache[self.container]
+            except KeyError:
+                meta = self.get_container_info()
+                cache[self.container] = meta
+        else:
+            meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         blockhash = meta['x-container-block-hash']
         size = size if size is not None else fstat(fileobj.fileno()).st_size
         nblocks = 1 + (size - 1) // blocksize
         return (blocksize, blockhash, size, nblocks)
 
-    def _get_missing_hashes(
+    def _create_object_or_get_missing_hashes(
             self, obj, json,
             size=None,
             format='json',
             hashmap=True,
             content_type=None,
-            etag=None,
+            if_etag_match=None,
+            if_etag_not_match=None,
             content_encoding=None,
             content_disposition=None,
             permissions=None,
@@ -221,18 +298,16 @@ class PithosClient(PithosRestAPI):
             hashmap=True,
             content_type=content_type,
             json=json,
-            etag=etag,
+            if_etag_match=if_etag_match,
+            if_etag_not_match=if_etag_not_match,
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=permissions,
             public=public,
             success=success)
-        if r.status_code == 201:
-            r.release()
-            return None
-        return r.json
+        return (None if r.status_code == 201 else r.json), r.headers
 
-    def _culculate_blocks_for_upload(
+    def _calculate_blocks_for_upload(
             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
             hash_cb=None):
         offset = 0
@@ -240,17 +315,19 @@ class PithosClient(PithosRestAPI):
             hash_gen = hash_cb(nblocks)
             hash_gen.next()
 
-        for i in range(nblocks):
-            block = fileobj.read(min(blocksize, size - offset))
+        for i in xrange(nblocks):
+            block = readall(fileobj, min(blocksize, size - offset))
             bytes = len(block)
+            if bytes <= 0:
+                break
             hash = _pithos_hash(block, blockhash)
             hashes.append(hash)
             hmap[hash] = (offset, bytes)
             offset += bytes
             if hash_cb:
                 hash_gen.next()
-        msg = 'Failed to calculate uploaded blocks:'
-        ' Offset and object size do not match'
+        msg = ('Failed to calculate uploading blocks: '
+               'read bytes(%s) != requested size (%s)' % (offset, size))
         assert offset == size, msg
 
     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
@@ -263,8 +340,8 @@ class PithosClient(PithosRestAPI):
         for hash in missing:
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
-            data = fileobj.read(bytes)
-            r = self._put_block_async(data, hash, upload_gen)
+            data = readall(fileobj, bytes)
+            r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
             for thread in set(flying).difference(unfinished):
@@ -301,11 +378,14 @@ class PithosClient(PithosRestAPI):
             hash_cb=None,
             upload_cb=None,
             etag=None,
+            if_etag_match=None,
+            if_not_exist=None,
             content_encoding=None,
             content_disposition=None,
             content_type=None,
             sharing=None,
-            public=None):
+            public=None,
+            container_info_cache=None):
         """Upload an object using multiple connections (threads)
 
         :param obj: (str) remote object path
@@ -318,6 +398,14 @@ class PithosClient(PithosRestAPI):
 
         :param etag: (str)
 
+        :param if_etag_match: (str) Push that value to if-match header at file
+            creation
+
+        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+            it does not exist remotely, otherwise the operation will fail.
+            Involves the case of an object with the same path is created while
+            the object is being uploaded.
+
         :param content_encoding: (str)
 
         :param content_disposition: (str)
@@ -328,17 +416,19 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :param container_info_cache: (dict) if given, avoid redundant calls to
+            server for container info (block size and hash information)
         """
         self._assert_container()
 
-        #init
-        block_info = (blocksize, blockhash, size, nblocks) =\
-            self._get_file_block_info(f, size)
+        block_info = (
+            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
+                f, size, container_info_cache)
         (hashes, hmap, offset) = ([], {}, 0)
-        if not content_type:
-            content_type = 'application/octet-stream'
+        content_type = content_type or 'application/octet-stream'
 
-        self._culculate_blocks_for_upload(
+        self._calculate_blocks_for_upload(
             *block_info,
             hashes=hashes,
             hmap=hmap,
@@ -346,18 +436,19 @@ class PithosClient(PithosRestAPI):
             hash_cb=hash_cb)
 
         hashmap = dict(bytes=size, hashes=hashes)
-        missing = self._get_missing_hashes(
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
             obj, hashmap,
             content_type=content_type,
             size=size,
-            etag=etag,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=sharing,
             public=public)
 
         if missing is None:
-            return
+            return obj_headers
 
         if upload_cb:
             upload_gen = upload_cb(len(missing))
@@ -375,10 +466,7 @@ class PithosClient(PithosRestAPI):
                 sendlog.info('%s blocks missing' % len(missing))
                 num_of_blocks = len(missing)
                 missing = self._upload_missing_blocks(
-                    missing,
-                    hmap,
-                    f,
-                    upload_gen)
+                    missing, hmap, f, upload_gen)
                 if missing:
                     if num_of_blocks == len(missing):
                         retries -= 1
@@ -387,23 +475,168 @@ class PithosClient(PithosRestAPI):
                 else:
                     break
             if missing:
+                try:
+                    details = ['%s' % thread.exception for thread in missing]
+                except Exception:
+                    details = ['Also, failed to read thread exceptions']
                 raise ClientError(
                     '%s blocks failed to upload' % len(missing),
-                    status=800)
+                    details=details)
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+            raise
+
+        r = self.object_put(
+            obj,
+            format='json',
+            hashmap=True,
+            content_type=content_type,
+            content_encoding=content_encoding,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            etag=etag,
+            json=hashmap,
+            permissions=sharing,
+            public=public,
+            success=201)
+        return r.headers
+
+    def upload_from_string(
+            self, obj, input_str,
+            hash_cb=None,
+            upload_cb=None,
+            etag=None,
+            if_etag_match=None,
+            if_not_exist=None,
+            content_encoding=None,
+            content_disposition=None,
+            content_type=None,
+            sharing=None,
+            public=None,
+            container_info_cache=None):
+        """Upload an object using multiple connections (threads)
+
+        :param obj: (str) remote object path
+
+        :param input_str: (str) upload content
+
+        :param hash_cb: optional progress.bar object for calculating hashes
+
+        :param upload_cb: optional progress.bar object for uploading
+
+        :param etag: (str)
+
+        :param if_etag_match: (str) Push that value to if-match header at file
+            creation
+
+        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+            it does not exist remotely, otherwise the operation will fail.
+            Involves the case of an object with the same path is created while
+            the object is being uploaded.
+
+        :param content_encoding: (str)
+
+        :param content_disposition: (str)
+
+        :param content_type: (str)
+
+        :param sharing: {'read':[user and/or grp names],
+            'write':[usr and/or grp names]}
+
+        :param public: (bool)
+
+        :param container_info_cache: (dict) if given, avoid redundant calls to
+            server for container info (block size and hash information)
+        """
+        self._assert_container()
+
+        blocksize, blockhash, size, nblocks = self._get_file_block_info(
+                fileobj=None, size=len(input_str), cache=container_info_cache)
+        (hashes, hmap, offset) = ([], {}, 0)
+        if not content_type:
+            content_type = 'application/octet-stream'
+
+        hashes = []
+        hmap = {}
+        for blockid in range(nblocks):
+            start = blockid * blocksize
+            block = input_str[start: (start + blocksize)]
+            hashes.append(_pithos_hash(block, blockhash))
+            hmap[hashes[blockid]] = (start, block)
+
+        hashmap = dict(bytes=size, hashes=hashes)
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
+            obj, hashmap,
+            content_type=content_type,
+            size=size,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            content_encoding=content_encoding,
+            content_disposition=content_disposition,
+            permissions=sharing,
+            public=public)
+        if missing is None:
+            return obj_headers
+        num_of_missing = len(missing)
+
+        if upload_cb:
+            self.progress_bar_gen = upload_cb(nblocks)
+            for i in range(nblocks + 1 - num_of_missing):
+                self._cb_next()
+
+        tries = 7
+        old_failures = 0
+        try:
+            while tries and missing:
+                flying = []
+                failures = []
+                for hash in missing:
+                    offset, block = hmap[hash]
+                    bird = self._put_block_async(block, hash)
+                    flying.append(bird)
+                    unfinished = self._watch_thread_limit(flying)
+                    for thread in set(flying).difference(unfinished):
+                        if thread.exception:
+                            failures.append(thread.kwargs['hash'])
+                        if thread.isAlive():
+                            flying.append(thread)
+                        else:
+                            self._cb_next()
+                    flying = unfinished
+                for thread in flying:
+                    thread.join()
+                    if thread.exception:
+                        failures.append(thread.kwargs['hash'])
+                    self._cb_next()
+                missing = failures
+                if missing and len(missing) == old_failures:
+                    tries -= 1
+                old_failures = len(missing)
+            if missing:
+                raise ClientError('%s blocks failed to upload' % len(missing))
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
                 thread.join()
             raise
+        self._cb_next()
 
         r = self.object_put(
             obj,
             format='json',
             hashmap=True,
             content_type=content_type,
+            content_encoding=content_encoding,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            etag=etag,
             json=hashmap,
+            permissions=sharing,
+            public=public,
             success=201)
-        r.release()
+        return r.headers
 
     # download_* auxiliary methods
     def _get_remote_blocks_info(self, obj, **restargs):
@@ -417,19 +650,28 @@ class PithosClient(PithosRestAPI):
         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
         map_dict = {}
         for i, h in enumerate(hashmap['hashes']):
-            map_dict[h] = i
+            #  map_dict[h] = i   CHAGE
+            if h in map_dict:
+                map_dict[h].append(i)
+            else:
+                map_dict[h] = [i]
         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
     def _dump_blocks_sync(
-            self, obj, remote_hashes, blocksize, total_size, dst, range,
+            self, obj, remote_hashes, blocksize, total_size, dst, crange,
             **args):
+        if not total_size:
+            return
         for blockid, blockhash in enumerate(remote_hashes):
             if blockhash:
                 start = blocksize * blockid
                 is_last = start + blocksize > total_size
                 end = (total_size - 1) if is_last else (start + blocksize - 1)
-                (start, end) = _range_up(start, end, range)
-                args['data_range'] = 'bytes=%s-%s' % (start, end)
+                data_range = _range_up(start, end, total_size, crange)
+                if not data_range:
+                    self._cb_next()
+                    continue
+                args['data_range'] = 'bytes=%s' % data_range
                 r = self.object_get(obj, success=(200, 206), **args)
                 self._cb_next()
                 dst.write(r.content)
@@ -442,67 +684,70 @@ class PithosClient(PithosRestAPI):
 
     def _hash_from_file(self, fp, start, size, blockhash):
         fp.seek(start)
-        block = fp.read(size)
+        block = readall(fp, size)
         h = newhashlib(blockhash)
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
 
-    def _thread2file(self, flying, local_file, offset=0, **restargs):
+    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
         """write the results of a greenleted rest call to a file
 
         :param offset: the offset of the file up to blocksize
         - e.g. if the range is 10-100, all blocks will be written to
         normal_position - 10
         """
-        finished = []
-        for i, (start, g) in enumerate(flying.items()):
-            if not g.isAlive():
-                if g.exception:
-                    raise g.exception
-                block = g.value.content
-                local_file.seek(start - offset)
+        for key, g in flying.items():
+            if g.isAlive():
+                continue
+            if g.exception:
+                raise g.exception
+            block = g.value.content
+            for block_start in blockids[key]:
+                local_file.seek(block_start + offset)
                 local_file.write(block)
                 self._cb_next()
-                finished.append(flying.pop(start))
+            flying.pop(key)
+            blockids.pop(key)
         local_file.flush()
-        return finished
 
     def _dump_blocks_async(
             self, obj, remote_hashes, blocksize, total_size, local_file,
             blockhash=None, resume=False, filerange=None, **restargs):
         file_size = fstat(local_file.fileno()).st_size if resume else 0
-        flying = {}
-        finished = []
+        flying = dict()
+        blockid_dict = dict()
         offset = 0
-        if filerange is not None:
-            rstart = int(filerange.split('-')[0])
-            offset = rstart if blocksize > rstart else rstart % blocksize
 
         self._init_thread_limit()
-        for block_hash, blockid in remote_hashes.items():
-            start = blocksize * blockid
-            if start < file_size and block_hash == self._hash_from_file(
-                    local_file, start, blocksize, blockhash):
-                self._cb_next()
-                continue
-            self._watch_thread_limit(flying.values())
-            finished += self._thread2file(
-                flying,
-                local_file,
-                offset,
-                **restargs)
-            end = total_size - 1 if start + blocksize > total_size\
-                else start + blocksize - 1
-            (start, end) = _range_up(start, end, filerange)
-            if start == end:
-                self._cb_next()
-                continue
-            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
-            flying[start] = self._get_block_async(obj, **restargs)
+        for block_hash, blockids in remote_hashes.items():
+            blockids = [blk * blocksize for blk in blockids]
+            unsaved = [blk for blk in blockids if not (
+                blk < file_size and block_hash == self._hash_from_file(
+                        local_file, blk, blocksize, blockhash))]
+            self._cb_next(len(blockids) - len(unsaved))
+            if unsaved:
+                key = unsaved[0]
+                self._watch_thread_limit(flying.values())
+                self._thread2file(
+                    flying, blockid_dict, local_file, offset,
+                    **restargs)
+                end = total_size - 1 if (
+                    key + blocksize > total_size) else key + blocksize - 1
+                if end < key:
+                    self._cb_next()
+                    continue
+                data_range = _range_up(key, end, total_size, filerange)
+                if not data_range:
+                    self._cb_next()
+                    continue
+                restargs[
+                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
+                flying[key] = self._get_block_async(obj, **restargs)
+                blockid_dict[key] = unsaved
 
         for thread in flying.values():
             thread.join()
-        finished += self._thread2file(flying, local_file, offset, **restargs)
+        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
 
     def download_object(
             self, obj, dst,
@@ -552,7 +797,7 @@ class PithosClient(PithosRestAPI):
         assert total_size >= 0
 
         if download_cb:
-            self.progress_bar_gen = download_cb(len(remote_hashes))
+            self.progress_bar_gen = download_cb(len(hash_list))
             self._cb_next()
 
         if dst.isatty():
@@ -580,11 +825,92 @@ class PithosClient(PithosRestAPI):
 
         self._complete_cb()
 
+    def download_to_string(
+            self, obj,
+            download_cb=None,
+            version=None,
+            range_str=None,
+            if_match=None,
+            if_none_match=None,
+            if_modified_since=None,
+            if_unmodified_since=None):
+        """Download an object to a string (multiple connections). This method
+        uses threads for http requests, but stores all content in memory.
+
+        :param obj: (str) remote object path
+
+        :param download_cb: optional progress.bar object for downloading
+
+        :param version: (str) file version
+
+        :param range_str: (str) from, to are file positions (int) in bytes
+
+        :param if_match: (str)
+
+        :param if_none_match: (str)
+
+        :param if_modified_since: (str) formated date
+
+        :param if_unmodified_since: (str) formated date
+
+        :returns: (str) the whole object contents
+        """
+        restargs = dict(
+            version=version,
+            data_range=None if range_str is None else 'bytes=%s' % range_str,
+            if_match=if_match,
+            if_none_match=if_none_match,
+            if_modified_since=if_modified_since,
+            if_unmodified_since=if_unmodified_since)
+
+        (
+            blocksize,
+            blockhash,
+            total_size,
+            hash_list,
+            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+        assert total_size >= 0
+
+        if download_cb:
+            self.progress_bar_gen = download_cb(len(hash_list))
+            self._cb_next()
+
+        num_of_blocks = len(remote_hashes)
+        ret = [''] * num_of_blocks
+        self._init_thread_limit()
+        flying = dict()
+        try:
+            for blockid, blockhash in enumerate(remote_hashes):
+                start = blocksize * blockid
+                is_last = start + blocksize > total_size
+                end = (total_size - 1) if is_last else (start + blocksize - 1)
+                data_range_str = _range_up(start, end, end, range_str)
+                if data_range_str:
+                    self._watch_thread_limit(flying.values())
+                    restargs['data_range'] = 'bytes=%s' % data_range_str
+                    flying[blockid] = self._get_block_async(obj, **restargs)
+                for runid, thread in flying.items():
+                    if (blockid + 1) == num_of_blocks:
+                        thread.join()
+                    elif thread.isAlive():
+                        continue
+                    if thread.exception:
+                        raise thread.exception
+                    ret[runid] = thread.value.content
+                    self._cb_next()
+                    flying.pop(runid)
+            return ''.join(ret)
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+
     #Command Progress Bar method
-    def _cb_next(self):
+    def _cb_next(self, step=1):
         if hasattr(self, 'progress_bar_gen'):
             try:
-                self.progress_bar_gen.next()
+                for i in xrange(step):
+                    self.progress_bar_gen.next()
             except:
                 pass
 
@@ -601,8 +927,7 @@ class PithosClient(PithosRestAPI):
             if_match=None,
             if_none_match=None,
             if_modified_since=None,
-            if_unmodified_since=None,
-            data_range=None):
+            if_unmodified_since=None):
         """
         :param obj: (str) remote object path
 
@@ -614,9 +939,6 @@ class PithosClient(PithosRestAPI):
 
         :param if_unmodified_since: (str) formated date
 
-        :param data_range: (str) from-to where from and to are integers
-            denoting file positions in bytes
-
         :returns: (list)
         """
         try:
@@ -627,8 +949,7 @@ class PithosClient(PithosRestAPI):
                 if_etag_match=if_match,
                 if_etag_not_match=if_none_match,
                 if_modified_since=if_modified_since,
-                if_unmodified_since=if_unmodified_since,
-                data_range=data_range)
+                if_unmodified_since=if_unmodified_since)
         except ClientError as err:
             if err.status == 304 or err.status == 412:
                 return {}
@@ -642,14 +963,13 @@ class PithosClient(PithosRestAPI):
         :param usernames: (list)
         """
         r = self.account_post(update=True, groups={group: usernames})
-        r.release()
+        return r
 
     def del_account_group(self, group):
         """
         :param group: (str)
         """
-        r = self.account_post(update=True, groups={group: []})
-        r.release()
+        self.account_post(update=True, groups={group: []})
 
     def get_account_info(self, until=None):
         """
@@ -671,18 +991,18 @@ class PithosClient(PithosRestAPI):
             'X-Account-Policy-Quota',
             exactMatch=True)
 
-    def get_account_versioning(self):
-        """
-        :returns: (dict)
-        """
-        return filter_in(
-            self.get_account_info(),
-            'X-Account-Policy-Versioning',
-            exactMatch=True)
+    #def get_account_versioning(self):
+    #    """
+    #    :returns: (dict)
+    #    """
+    #    return filter_in(
+    #        self.get_account_info(),
+    #        'X-Account-Policy-Versioning',
+    #        exactMatch=True)
 
     def get_account_meta(self, until=None):
         """
-        :meta until: (str) formated date
+        :param until: (str) formated date
 
         :returns: (dict)
         """
@@ -700,28 +1020,27 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.account_post(update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_account_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
         """
         r = self.account_post(update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
-    def set_account_quota(self, quota):
-        """
-        :param quota: (int)
-        """
-        r = self.account_post(update=True, quota=quota)
-        r.release()
+    #def set_account_quota(self, quota):
+    #    """
+    #    :param quota: (int)
+    #    """
+    #    self.account_post(update=True, quota=quota)
 
-    def set_account_versioning(self, versioning):
-        """
-        "param versioning: (str)
-        """
-        r = self.account_post(update=True, versioning=versioning)
-        r.release()
+    #def set_account_versioning(self, versioning):
+    #    """
+    #    :param versioning: (str)
+    #    """
+    #    r = self.account_post(update=True, versioning=versioning)
+    #    return r.headers
 
     def list_containers(self):
         """
@@ -745,7 +1064,6 @@ class PithosClient(PithosRestAPI):
             until=until,
             delimiter=delimiter,
             success=(204, 404, 409))
-        r.release()
         if r.status_code == 404:
             raise ClientError(
                 'Container "%s" does not exist' % self.container,
@@ -754,6 +1072,7 @@ class PithosClient(PithosRestAPI):
             raise ClientError(
                 'Container "%s" is not empty' % self.container,
                 r.status_code)
+        return r.headers
 
     def get_container_versioning(self, container=None):
         """
@@ -770,7 +1089,7 @@ class PithosClient(PithosRestAPI):
         finally:
             self.container = cnt_back_up
 
-    def get_container_quota(self, container=None):
+    def get_container_limit(self, container=None):
         """
         :param container: (str)
 
@@ -785,7 +1104,7 @@ class PithosClient(PithosRestAPI):
         finally:
             self.container = cnt_back_up
 
-    def get_container_info(self, until=None):
+    def get_container_info(self, container=None, until=None):
         """
         :param until: (str) formated date
 
@@ -793,11 +1112,16 @@ class PithosClient(PithosRestAPI):
 
         :raises ClientError: 404 Container not found
         """
+        bck_cont = self.container
         try:
+            self.container = container or bck_cont
+            self._assert_container()
             r = self.container_head(until=until)
         except ClientError as err:
             err.details.append('for container %s' % self.container)
             raise err
+        finally:
+            self.container = bck_cont
         return r.headers
 
     def get_container_meta(self, until=None):
@@ -807,8 +1131,7 @@ class PithosClient(PithosRestAPI):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Meta')
+            self.get_container_info(until=until), 'X-Container-Meta')
 
     def get_container_object_meta(self, until=None):
         """
@@ -817,8 +1140,7 @@ class PithosClient(PithosRestAPI):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Object-Meta')
+            self.get_container_info(until=until), 'X-Container-Object-Meta')
 
     def set_container_meta(self, metapairs):
         """
@@ -826,28 +1148,30 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.container_post(update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_container_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
+
+        :returns: (dict) response headers
         """
         r = self.container_post(update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
-    def set_container_quota(self, quota):
+    def set_container_limit(self, limit):
         """
-        :param quota: (int)
+        :param limit: (int)
         """
-        r = self.container_post(update=True, quota=quota)
-        r.release()
+        r = self.container_post(update=True, quota=limit)
+        return r.headers
 
     def set_container_versioning(self, versioning):
         """
         :param versioning: (str)
         """
         r = self.container_post(update=True, versioning=versioning)
-        r.release()
+        return r.headers
 
     def del_object(self, obj, until=None, delimiter=None):
         """
@@ -859,7 +1183,7 @@ class PithosClient(PithosRestAPI):
         """
         self._assert_container()
         r = self.object_delete(obj, until=until, delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def set_object_meta(self, obj, metapairs):
         """
@@ -869,7 +1193,7 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.object_post(obj, update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_object_meta(self, obj, metakey):
         """
@@ -878,7 +1202,7 @@ class PithosClient(PithosRestAPI):
         :param metakey: (str) metadatum key
         """
         r = self.object_post(obj, update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
     def publish_object(self, obj):
         """
@@ -886,9 +1210,9 @@ class PithosClient(PithosRestAPI):
 
         :returns: (str) access url
         """
-        r = self.object_post(obj, update=True, public=True)
-        r.release()
+        self.object_post(obj, update=True, public=True)
         info = self.get_object_info(obj)
+        return info['x-object-public']
         pref, sep, rest = self.base_url.partition('//')
         base = rest.split('/')[0]
         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
@@ -898,7 +1222,7 @@ class PithosClient(PithosRestAPI):
         :param obj: (str) remote object path
         """
         r = self.object_post(obj, update=True, public=False)
-        r.release()
+        return r.headers
 
     def get_object_info(self, obj, version=None):
         """
@@ -952,29 +1276,31 @@ class PithosClient(PithosRestAPI):
 
     def set_object_sharing(
             self, obj,
-            read_permition=False, write_permition=False):
+            read_permission=False, write_permission=False):
         """Give read/write permisions to an object.
 
         :param obj: (str) remote object path
 
-        :param read_permition: (list - bool) users and user groups that get
-            read permition for this object - False means all previous read
+        :param read_permission: (list - bool) users and user groups that get
+            read permission for this object - False means all previous read
             permissions will be removed
 
-        :param write_perimition: (list - bool) of users and user groups to get
-           write permition for this object - False means all previous write
+        :param write_permission: (list - bool) of users and user groups to get
+           write permission for this object - False means all previous write
            permissions will be removed
+
+        :returns: (dict) response headers
         """
 
-        perms = dict(read=read_permition or '', write=write_permition or '')
+        perms = dict(read=read_permission or '', write=write_permission or '')
         r = self.object_post(obj, update=True, permissions=perms)
-        r.release()
+        return r.headers
 
     def del_object_sharing(self, obj):
         """
         :param obj: (str) remote object path
         """
-        self.set_object_sharing(obj)
+        return self.set_object_sharing(obj)
 
     def append_object(self, obj, source_file, upload_cb=None):
         """
@@ -984,48 +1310,79 @@ class PithosClient(PithosRestAPI):
 
         :param upload_db: progress.bar for uploading
         """
-
         self._assert_container()
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
         nblocks = 1 + (filesize - 1) // blocksize
         offset = 0
+        headers = {}
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
-        for i in range(nblocks):
-            block = source_file.read(min(blocksize, filesize - offset))
-            offset += len(block)
-            r = self.object_post(
-                obj,
-                update=True,
-                content_range='bytes */*',
-                content_type='application/octet-stream',
-                content_length=len(block),
-                data=block)
-            r.release()
-
-            if upload_cb:
-                upload_gen.next()
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        flying = {}
+        self._init_thread_limit()
+        try:
+            for i in range(nblocks):
+                block = source_file.read(min(blocksize, filesize - offset))
+                offset += len(block)
+
+                self._watch_thread_limit(flying.values())
+                unfinished = {}
+                flying[i] = SilentEvent(
+                    method=self.object_post,
+                    obj=obj,
+                    update=True,
+                    content_range='bytes */*',
+                    content_type='application/octet-stream',
+                    content_length=len(block),
+                    data=block)
+                flying[i].start()
+
+                for key, thread in flying.items():
+                    if thread.isAlive():
+                        if i < nblocks:
+                            unfinished[key] = thread
+                            continue
+                        thread.join()
+                    if thread.exception:
+                        raise thread.exception
+                    headers[key] = thread.value.headers
+                    self._cb_next()
+                flying = unfinished
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+        finally:
+            from time import sleep
+            sleep(2 * len(activethreads()))
+            self._cb_next()
+        return headers.values()
 
     def truncate_object(self, obj, upto_bytes):
         """
         :param obj: (str) remote object path
 
         :param upto_bytes: max number of bytes to leave on file
+
+        :returns: (dict) response headers
         """
+        ctype = self.get_object_info(obj)['content-type']
         r = self.object_post(
             obj,
             update=True,
             content_range='bytes 0-%s/*' % upto_bytes,
-            content_type='application/octet-stream',
+            content_type=ctype,
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
-        r.release()
+        return r.headers
 
-    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+    def overwrite_object(
+            self, obj, start, end, source_file,
+            source_version=None, upload_cb=None):
         """Overwrite a part of an object from local source file
+        ATTENTION: content_type must always be application/octet-stream
 
         :param obj: (str) remote object path
 
@@ -1038,26 +1395,22 @@ class PithosClient(PithosRestAPI):
         :param upload_db: progress.bar for uploading
         """
 
-        r = self.get_object_info(obj)
-        rf_size = int(r['content-length'])
-        if rf_size < int(start):
-            raise ClientError(
-                'Range start exceeds file size',
-                status=416)
-        elif rf_size < int(end):
-            raise ClientError(
-                'Range end exceeds file size',
-                status=416)
         self._assert_container()
+        r = self.get_object_info(obj, version=source_version)
+        rf_size = int(r['content-length'])
+        start, end = int(start), int(end)
+        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+            start, rf_size)
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
-        datasize = int(end) - int(start) + 1
+        datasize = end - start + 1
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        headers = []
         for i in range(nblocks):
             read_size = min(blocksize, filesize - offset, datasize - offset)
             block = source_file.read(read_size)
@@ -1069,12 +1422,13 @@ class PithosClient(PithosRestAPI):
                 content_range='bytes %s-%s/*' % (
                     start + offset,
                     start + offset + len(block) - 1),
+                source_version=source_version,
                 data=block)
+            headers.append(dict(r.headers))
             offset += len(block)
-            r.release()
-
-            if upload_cb:
-                upload_gen.next()
+            self._cb_next()
+        self._cb_next()
+        return headers
 
     def copy_object(
             self, src_container, src_object, dst_container,
@@ -1102,6 +1456,8 @@ class PithosClient(PithosRestAPI):
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
@@ -1116,7 +1472,7 @@ class PithosClient(PithosRestAPI):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def move_object(
             self, src_container, src_object, dst_container,
@@ -1144,6 +1500,8 @@ class PithosClient(PithosRestAPI):
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
@@ -1159,7 +1517,7 @@ class PithosClient(PithosRestAPI):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
         """Get accounts that share with self.account