Fix all minor typos and modifications in tests
[kamaki] / kamaki / clients / pithos / __init__.py
index 0cc0cd5..b0deade 100644 (file)
@@ -36,14 +36,14 @@ from threading import enumerate as activethreads
 from os import fstat
 from hashlib import new as newhashlib
 from time import time
 from os import fstat
 from hashlib import new as newhashlib
 from time import time
+from StringIO import StringIO
 
 from binascii import hexlify
 
 from kamaki.clients import SilentEvent, sendlog
 
 from binascii import hexlify
 
 from kamaki.clients import SilentEvent, sendlog
-from kamaki.clients.pithos_rest_api import PithosRestAPI
+from kamaki.clients.pithos.rest_api import PithosRestClient
 from kamaki.clients.storage import ClientError
 from kamaki.clients.utils import path4url, filter_in
 from kamaki.clients.storage import ClientError
 from kamaki.clients.utils import path4url, filter_in
-from StringIO import StringIO
 
 
 def _pithos_hash(block, blockhash):
 
 
 def _pithos_hash(block, blockhash):
@@ -65,14 +65,36 @@ def _range_up(start, end, a_range):
     return (start, end)
 
 
     return (start, end)
 
 
-class PithosClient(PithosRestAPI):
-    """GRNet Pithos API client"""
-
-    _thread_exceptions = []
+class PithosClient(PithosRestClient):
+    """Synnefo Pithos+ API client"""
 
     def __init__(self, base_url, token, account=None, container=None):
         super(PithosClient, self).__init__(base_url, token, account, container)
 
 
     def __init__(self, base_url, token, account=None, container=None):
         super(PithosClient, self).__init__(base_url, token, account, container)
 
+    def create_container(
+            self,
+            container=None, sizelimit=None, versioning=None, metadata=None):
+        """
+        :param container: (str) if not given, self.container is used instead
+
+        :param sizelimit: (int) container total size limit in bytes
+
+        :param versioning: (str) can be auto or whatever supported by server
+
+        :param metadata: (dict) Custom user-defined metadata of the form
+            { 'name1': 'value1', 'name2': 'value2', ... }
+
+        :returns: (dict) response headers
+        """
+        cnt_back_up = self.container
+        try:
+            self.container = container or cnt_back_up
+            r = self.container_put(
+                quota=sizelimit, versioning=versioning, metadata=metadata)
+            return r.headers
+        finally:
+            self.container = cnt_back_up
+
     def purge_container(self, container=None):
         """Delete an empty container and destroy associated blocks
         """
     def purge_container(self, container=None):
         """Delete an empty container and destroy associated blocks
         """
@@ -82,7 +104,7 @@ class PithosClient(PithosRestAPI):
             r = self.container_delete(until=unicode(time()))
         finally:
             self.container = cnt_back_up
             r = self.container_delete(until=unicode(time()))
         finally:
             self.container = cnt_back_up
-        r.release()
+        return r.headers
 
     def upload_object_unchunked(
             self, obj, f,
 
     def upload_object_unchunked(
             self, obj, f,
@@ -115,6 +137,8 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :returns: (dict) created object metadata
         """
         self._assert_container()
 
         """
         self._assert_container()
 
@@ -141,7 +165,7 @@ class PithosClient(PithosRestAPI):
             permissions=sharing,
             public=public,
             success=201)
             permissions=sharing,
             public=public,
             success=201)
-        r.release()
+        return r.headers
 
     def create_object_by_manifestation(
             self, obj,
 
     def create_object_by_manifestation(
             self, obj,
@@ -166,6 +190,8 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :returns: (dict) created object metadata
         """
         self._assert_container()
         r = self.object_put(
         """
         self._assert_container()
         r = self.object_put(
@@ -178,10 +204,10 @@ class PithosClient(PithosRestAPI):
             permissions=sharing,
             public=public,
             manifest='%s/%s' % (self.container, obj))
             permissions=sharing,
             public=public,
             manifest='%s/%s' % (self.container, obj))
-        r.release()
+        return r.headers
 
     # upload_* auxiliary methods
 
     # upload_* auxiliary methods
-    def _put_block_async(self, data, hash, upload_gen=None):
+    def _put_block_async(self, data, hash):
         event = SilentEvent(method=self._put_block, data=data, hash=hash)
         event.start()
         return event
         event = SilentEvent(method=self._put_block, data=data, hash=hash)
         event.start()
         return event
@@ -195,21 +221,37 @@ class PithosClient(PithosRestAPI):
             format='json')
         assert r.json[0] == hash, 'Local hash does not match server'
 
             format='json')
         assert r.json[0] == hash, 'Local hash does not match server'
 
-    def _get_file_block_info(self, fileobj, size=None):
-        meta = self.get_container_info()
+    def _get_file_block_info(self, fileobj, size=None, cache=None):
+        """
+        :param fileobj: (file descriptor) source
+
+        :param size: (int) size of data to upload from source
+
+        :param cache: (dict) if provided, cache container info response to
+        avoid redundant calls
+        """
+        if isinstance(cache, dict):
+            try:
+                meta = cache[self.container]
+            except KeyError:
+                meta = self.get_container_info()
+                cache[self.container] = meta
+        else:
+            meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         blockhash = meta['x-container-block-hash']
         size = size if size is not None else fstat(fileobj.fileno()).st_size
         nblocks = 1 + (size - 1) // blocksize
         return (blocksize, blockhash, size, nblocks)
 
         blocksize = int(meta['x-container-block-size'])
         blockhash = meta['x-container-block-hash']
         size = size if size is not None else fstat(fileobj.fileno()).st_size
         nblocks = 1 + (size - 1) // blocksize
         return (blocksize, blockhash, size, nblocks)
 
-    def _get_missing_hashes(
+    def _create_object_or_get_missing_hashes(
             self, obj, json,
             size=None,
             format='json',
             hashmap=True,
             content_type=None,
             self, obj, json,
             size=None,
             format='json',
             hashmap=True,
             content_type=None,
-            etag=None,
+            if_etag_match=None,
+            if_etag_not_match=None,
             content_encoding=None,
             content_disposition=None,
             permissions=None,
             content_encoding=None,
             content_disposition=None,
             permissions=None,
@@ -221,18 +263,16 @@ class PithosClient(PithosRestAPI):
             hashmap=True,
             content_type=content_type,
             json=json,
             hashmap=True,
             content_type=content_type,
             json=json,
-            etag=etag,
+            if_etag_match=if_etag_match,
+            if_etag_not_match=if_etag_not_match,
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=permissions,
             public=public,
             success=success)
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=permissions,
             public=public,
             success=success)
-        if r.status_code == 201:
-            r.release()
-            return None
-        return r.json
+        return (None if r.status_code == 201 else r.json), r.headers
 
 
-    def _culculate_blocks_for_upload(
+    def _calculate_blocks_for_upload(
             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
             hash_cb=None):
         offset = 0
             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
             hash_cb=None):
         offset = 0
@@ -264,7 +304,7 @@ class PithosClient(PithosRestAPI):
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
             data = fileobj.read(bytes)
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
             data = fileobj.read(bytes)
-            r = self._put_block_async(data, hash, upload_gen)
+            r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
             for thread in set(flying).difference(unfinished):
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
             for thread in set(flying).difference(unfinished):
@@ -301,11 +341,14 @@ class PithosClient(PithosRestAPI):
             hash_cb=None,
             upload_cb=None,
             etag=None,
             hash_cb=None,
             upload_cb=None,
             etag=None,
+            if_etag_match=None,
+            if_not_exist=None,
             content_encoding=None,
             content_disposition=None,
             content_type=None,
             sharing=None,
             content_encoding=None,
             content_disposition=None,
             content_type=None,
             sharing=None,
-            public=None):
+            public=None,
+            container_info_cache=None):
         """Upload an object using multiple connections (threads)
 
         :param obj: (str) remote object path
         """Upload an object using multiple connections (threads)
 
         :param obj: (str) remote object path
@@ -318,6 +361,14 @@ class PithosClient(PithosRestAPI):
 
         :param etag: (str)
 
 
         :param etag: (str)
 
+        :param if_etag_match: (str) Push that value to if-match header at file
+            creation
+
+        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+            it does not exist remotely, otherwise the operation will fail.
+            Involves the case of an object with the same path is created while
+            the object is being uploaded.
+
         :param content_encoding: (str)
 
         :param content_disposition: (str)
         :param content_encoding: (str)
 
         :param content_disposition: (str)
@@ -328,17 +379,20 @@ class PithosClient(PithosRestAPI):
             'write':[usr and/or grp names]}
 
         :param public: (bool)
             'write':[usr and/or grp names]}
 
         :param public: (bool)
+
+        :param container_info_cache: (dict) if given, avoid redundant calls to
+            server for container info (block size and hash information)
         """
         self._assert_container()
 
         """
         self._assert_container()
 
-        #init
-        block_info = (blocksize, blockhash, size, nblocks) =\
-            self._get_file_block_info(f, size)
+        block_info = (
+            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
+                f, size, container_info_cache)
         (hashes, hmap, offset) = ([], {}, 0)
         if not content_type:
             content_type = 'application/octet-stream'
 
         (hashes, hmap, offset) = ([], {}, 0)
         if not content_type:
             content_type = 'application/octet-stream'
 
-        self._culculate_blocks_for_upload(
+        self._calculate_blocks_for_upload(
             *block_info,
             hashes=hashes,
             hmap=hmap,
             *block_info,
             hashes=hashes,
             hmap=hmap,
@@ -346,18 +400,19 @@ class PithosClient(PithosRestAPI):
             hash_cb=hash_cb)
 
         hashmap = dict(bytes=size, hashes=hashes)
             hash_cb=hash_cb)
 
         hashmap = dict(bytes=size, hashes=hashes)
-        missing = self._get_missing_hashes(
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
             obj, hashmap,
             content_type=content_type,
             size=size,
             obj, hashmap,
             content_type=content_type,
             size=size,
-            etag=etag,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=sharing,
             public=public)
 
         if missing is None:
             content_encoding=content_encoding,
             content_disposition=content_disposition,
             permissions=sharing,
             public=public)
 
         if missing is None:
-            return
+            return obj_headers
 
         if upload_cb:
             upload_gen = upload_cb(len(missing))
 
         if upload_cb:
             upload_gen = upload_cb(len(missing))
@@ -389,7 +444,7 @@ class PithosClient(PithosRestAPI):
             if missing:
                 raise ClientError(
                     '%s blocks failed to upload' % len(missing),
             if missing:
                 raise ClientError(
                     '%s blocks failed to upload' % len(missing),
-                    status=800)
+                    details=['%s' % thread.exception for thread in missing])
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
@@ -401,9 +456,149 @@ class PithosClient(PithosRestAPI):
             format='json',
             hashmap=True,
             content_type=content_type,
             format='json',
             hashmap=True,
             content_type=content_type,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            etag=etag,
             json=hashmap,
             json=hashmap,
+            permissions=sharing,
+            public=public,
+            success=201)
+        return r.headers
+
+    def upload_from_string(
+            self, obj, input_str,
+            hash_cb=None,
+            upload_cb=None,
+            etag=None,
+            if_etag_match=None,
+            if_not_exist=None,
+            content_encoding=None,
+            content_disposition=None,
+            content_type=None,
+            sharing=None,
+            public=None,
+            container_info_cache=None):
+        """Upload an object using multiple connections (threads)
+
+        :param obj: (str) remote object path
+
+        :param input_str: (str) upload content
+
+        :param hash_cb: optional progress.bar object for calculating hashes
+
+        :param upload_cb: optional progress.bar object for uploading
+
+        :param etag: (str)
+
+        :param if_etag_match: (str) Push that value to if-match header at file
+            creation
+
+        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+            it does not exist remotely, otherwise the operation will fail.
+            Involves the case of an object with the same path is created while
+            the object is being uploaded.
+
+        :param content_encoding: (str)
+
+        :param content_disposition: (str)
+
+        :param content_type: (str)
+
+        :param sharing: {'read':[user and/or grp names],
+            'write':[usr and/or grp names]}
+
+        :param public: (bool)
+
+        :param container_info_cache: (dict) if given, avoid redundant calls to
+            server for container info (block size and hash information)
+        """
+        self._assert_container()
+
+        blocksize, blockhash, size, nblocks = self._get_file_block_info(
+                fileobj=None, size=len(input_str), cache=container_info_cache)
+        (hashes, hmap, offset) = ([], {}, 0)
+        if not content_type:
+            content_type = 'application/octet-stream'
+
+        hashes = []
+        hmap = {}
+        for blockid in range(nblocks):
+            start = blockid * blocksize
+            block = input_str[start: (start + blocksize)]
+            hashes.append(_pithos_hash(block, blockhash))
+            hmap[hashes[blockid]] = (start, block)
+
+        hashmap = dict(bytes=size, hashes=hashes)
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
+            obj, hashmap,
+            content_type=content_type,
+            size=size,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            content_encoding=content_encoding,
+            content_disposition=content_disposition,
+            permissions=sharing,
+            public=public)
+        if missing is None:
+            return obj_headers
+        num_of_missing = len(missing)
+
+        if upload_cb:
+            self.progress_bar_gen = upload_cb(nblocks)
+            for i in range(nblocks + 1 - num_of_missing):
+                self._cb_next()
+
+        tries = 7
+        old_failures = 0
+        try:
+            while tries and missing:
+                flying = []
+                failures = []
+                for hash in missing:
+                    offset, block = hmap[hash]
+                    bird = self._put_block_async(block, hash)
+                    flying.append(bird)
+                    unfinished = self._watch_thread_limit(flying)
+                    for thread in set(flying).difference(unfinished):
+                        if thread.exception:
+                            failures.append(thread.kwargs['hash'])
+                        if thread.isAlive():
+                            flying.append(thread)
+                        else:
+                            self._cb_next()
+                    flying = unfinished
+                for thread in flying:
+                    thread.join()
+                    if thread.exception:
+                        failures.append(thread.kwargs['hash'])
+                    self._cb_next()
+                missing = failures
+                if missing and len(missing) == old_failures:
+                    tries -= 1
+                old_failures = len(missing)
+            if missing:
+                raise ClientError(
+                    '%s blocks failed to upload' % len(missing),
+                    details=['%s' % thread.exception for thread in missing])
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+            raise
+
+        r = self.object_put(
+            obj,
+            format='json',
+            hashmap=True,
+            content_type=content_type,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            etag=etag,
+            json=hashmap,
+            permissions=sharing,
+            public=public,
             success=201)
             success=201)
-        r.release()
+        return r.headers
 
     # download_* auxiliary methods
     def _get_remote_blocks_info(self, obj, **restargs):
 
     # download_* auxiliary methods
     def _get_remote_blocks_info(self, obj, **restargs):
@@ -417,7 +612,11 @@ class PithosClient(PithosRestAPI):
         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
         map_dict = {}
         for i, h in enumerate(hashmap['hashes']):
         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
         map_dict = {}
         for i, h in enumerate(hashmap['hashes']):
-            map_dict[h] = i
+            #  map_dict[h] = i   CHAGE
+            if h in map_dict:
+                map_dict[h].append(i)
+            else:
+                map_dict[h] = [i]
         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
     def _dump_blocks_sync(
         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
     def _dump_blocks_sync(
@@ -447,62 +646,65 @@ class PithosClient(PithosRestAPI):
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
 
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
 
-    def _thread2file(self, flying, local_file, offset=0, **restargs):
+    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
         """write the results of a greenleted rest call to a file
 
         :param offset: the offset of the file up to blocksize
         - e.g. if the range is 10-100, all blocks will be written to
         normal_position - 10
         """
         """write the results of a greenleted rest call to a file
 
         :param offset: the offset of the file up to blocksize
         - e.g. if the range is 10-100, all blocks will be written to
         normal_position - 10
         """
-        finished = []
-        for i, (start, g) in enumerate(flying.items()):
-            if not g.isAlive():
-                if g.exception:
-                    raise g.exception
-                block = g.value.content
-                local_file.seek(start - offset)
+        for key, g in flying.items():
+            if g.isAlive():
+                continue
+            if g.exception:
+                raise g.exception
+            block = g.value.content
+            for block_start in blockids[key]:
+                local_file.seek(block_start + offset)
                 local_file.write(block)
                 self._cb_next()
                 local_file.write(block)
                 self._cb_next()
-                finished.append(flying.pop(start))
+            flying.pop(key)
+            blockids.pop(key)
         local_file.flush()
         local_file.flush()
-        return finished
 
     def _dump_blocks_async(
             self, obj, remote_hashes, blocksize, total_size, local_file,
             blockhash=None, resume=False, filerange=None, **restargs):
         file_size = fstat(local_file.fileno()).st_size if resume else 0
 
     def _dump_blocks_async(
             self, obj, remote_hashes, blocksize, total_size, local_file,
             blockhash=None, resume=False, filerange=None, **restargs):
         file_size = fstat(local_file.fileno()).st_size if resume else 0
-        flying = {}
-        finished = []
+        flying = dict()
+        blockid_dict = dict()
         offset = 0
         if filerange is not None:
             rstart = int(filerange.split('-')[0])
             offset = rstart if blocksize > rstart else rstart % blocksize
 
         self._init_thread_limit()
         offset = 0
         if filerange is not None:
             rstart = int(filerange.split('-')[0])
             offset = rstart if blocksize > rstart else rstart % blocksize
 
         self._init_thread_limit()
-        for block_hash, blockid in remote_hashes.items():
-            start = blocksize * blockid
-            if start < file_size and block_hash == self._hash_from_file(
-                    local_file, start, blocksize, blockhash):
-                self._cb_next()
-                continue
-            self._watch_thread_limit(flying.values())
-            finished += self._thread2file(
-                flying,
-                local_file,
-                offset,
-                **restargs)
-            end = total_size - 1 if start + blocksize > total_size\
-                else start + blocksize - 1
-            (start, end) = _range_up(start, end, filerange)
-            if start == end:
-                self._cb_next()
-                continue
-            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
-            flying[start] = self._get_block_async(obj, **restargs)
+        for block_hash, blockids in remote_hashes.items():
+            blockids = [blk * blocksize for blk in blockids]
+            unsaved = [blk for blk in blockids if not (
+                blk < file_size and block_hash == self._hash_from_file(
+                        local_file, blk, blocksize, blockhash))]
+            self._cb_next(len(blockids) - len(unsaved))
+            if unsaved:
+                key = unsaved[0]
+                self._watch_thread_limit(flying.values())
+                self._thread2file(
+                    flying, blockid_dict, local_file, offset,
+                    **restargs)
+                end = total_size - 1 if (
+                    key + blocksize > total_size) else key + blocksize - 1
+                start, end = _range_up(key, end, filerange)
+                if start == end:
+                    self._cb_next()
+                    continue
+                restargs['async_headers'] = {
+                    'Range': 'bytes=%s-%s' % (start, end)}
+                flying[key] = self._get_block_async(obj, **restargs)
+                blockid_dict[key] = unsaved
 
         for thread in flying.values():
             thread.join()
 
         for thread in flying.values():
             thread.join()
-        finished += self._thread2file(flying, local_file, offset, **restargs)
+        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
 
     def download_object(
             self, obj, dst,
 
     def download_object(
             self, obj, dst,
@@ -552,7 +754,7 @@ class PithosClient(PithosRestAPI):
         assert total_size >= 0
 
         if download_cb:
         assert total_size >= 0
 
         if download_cb:
-            self.progress_bar_gen = download_cb(len(remote_hashes))
+            self.progress_bar_gen = download_cb(len(hash_list))
             self._cb_next()
 
         if dst.isatty():
             self._cb_next()
 
         if dst.isatty():
@@ -580,11 +782,91 @@ class PithosClient(PithosRestAPI):
 
         self._complete_cb()
 
 
         self._complete_cb()
 
+    def download_to_string(
+            self, obj,
+            download_cb=None,
+            version=None,
+            range_str=None,
+            if_match=None,
+            if_none_match=None,
+            if_modified_since=None,
+            if_unmodified_since=None):
+        """Download an object to a string (multiple connections). This method
+        uses threads for http requests, but stores all content in memory.
+
+        :param obj: (str) remote object path
+
+        :param download_cb: optional progress.bar object for downloading
+
+        :param version: (str) file version
+
+        :param range_str: (str) from, to are file positions (int) in bytes
+
+        :param if_match: (str)
+
+        :param if_none_match: (str)
+
+        :param if_modified_since: (str) formated date
+
+        :param if_unmodified_since: (str) formated date
+
+        :returns: (str) the whole object contents
+        """
+        restargs = dict(
+            version=version,
+            data_range=None if range_str is None else 'bytes=%s' % range_str,
+            if_match=if_match,
+            if_none_match=if_none_match,
+            if_modified_since=if_modified_since,
+            if_unmodified_since=if_unmodified_since)
+
+        (
+            blocksize,
+            blockhash,
+            total_size,
+            hash_list,
+            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+        assert total_size >= 0
+
+        if download_cb:
+            self.progress_bar_gen = download_cb(len(hash_list))
+            self._cb_next()
+
+        num_of_blocks = len(remote_hashes)
+        ret = [''] * num_of_blocks
+        self._init_thread_limit()
+        flying = dict()
+        try:
+            for blockid, blockhash in enumerate(remote_hashes):
+                start = blocksize * blockid
+                is_last = start + blocksize > total_size
+                end = (total_size - 1) if is_last else (start + blocksize - 1)
+                (start, end) = _range_up(start, end, range_str)
+                if start < end:
+                    self._watch_thread_limit(flying.values())
+                    flying[blockid] = self._get_block_async(obj, **restargs)
+                for runid, thread in flying.items():
+                    if (blockid + 1) == num_of_blocks:
+                        thread.join()
+                    elif thread.isAlive():
+                        continue
+                    if thread.exception:
+                        raise thread.exception
+                    ret[runid] = thread.value.content
+                    self._cb_next()
+                    flying.pop(runid)
+            return ''.join(ret)
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+
     #Command Progress Bar method
     #Command Progress Bar method
-    def _cb_next(self):
+    def _cb_next(self, step=1):
         if hasattr(self, 'progress_bar_gen'):
             try:
         if hasattr(self, 'progress_bar_gen'):
             try:
-                self.progress_bar_gen.next()
+                for i in xrange(step):
+                    self.progress_bar_gen.next()
             except:
                 pass
 
             except:
                 pass
 
@@ -642,14 +924,13 @@ class PithosClient(PithosRestAPI):
         :param usernames: (list)
         """
         r = self.account_post(update=True, groups={group: usernames})
         :param usernames: (list)
         """
         r = self.account_post(update=True, groups={group: usernames})
-        r.release()
+        return r
 
     def del_account_group(self, group):
         """
         :param group: (str)
         """
 
     def del_account_group(self, group):
         """
         :param group: (str)
         """
-        r = self.account_post(update=True, groups={group: []})
-        r.release()
+        self.account_post(update=True, groups={group: []})
 
     def get_account_info(self, until=None):
         """
 
     def get_account_info(self, until=None):
         """
@@ -671,18 +952,18 @@ class PithosClient(PithosRestAPI):
             'X-Account-Policy-Quota',
             exactMatch=True)
 
             'X-Account-Policy-Quota',
             exactMatch=True)
 
-    def get_account_versioning(self):
-        """
-        :returns: (dict)
-        """
-        return filter_in(
-            self.get_account_info(),
-            'X-Account-Policy-Versioning',
-            exactMatch=True)
+    #def get_account_versioning(self):
+    #    """
+    #    :returns: (dict)
+    #    """
+    #    return filter_in(
+    #        self.get_account_info(),
+    #        'X-Account-Policy-Versioning',
+    #        exactMatch=True)
 
     def get_account_meta(self, until=None):
         """
 
     def get_account_meta(self, until=None):
         """
-        :meta until: (str) formated date
+        :param until: (str) formated date
 
         :returns: (dict)
         """
 
         :returns: (dict)
         """
@@ -700,28 +981,27 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.account_post(update=True, metadata=metapairs)
         """
         assert(type(metapairs) is dict)
         r = self.account_post(update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_account_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
         """
         r = self.account_post(update=True, metadata={metakey: ''})
 
     def del_account_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
         """
         r = self.account_post(update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
 
-    def set_account_quota(self, quota):
-        """
-        :param quota: (int)
-        """
-        r = self.account_post(update=True, quota=quota)
-        r.release()
+    #def set_account_quota(self, quota):
+    #    """
+    #    :param quota: (int)
+    #    """
+    #    self.account_post(update=True, quota=quota)
 
 
-    def set_account_versioning(self, versioning):
-        """
-        "param versioning: (str)
-        """
-        r = self.account_post(update=True, versioning=versioning)
-        r.release()
+    #def set_account_versioning(self, versioning):
+    #    """
+    #    :param versioning: (str)
+    #    """
+    #    r = self.account_post(update=True, versioning=versioning)
+    #    return r.headers
 
     def list_containers(self):
         """
 
     def list_containers(self):
         """
@@ -745,7 +1025,6 @@ class PithosClient(PithosRestAPI):
             until=until,
             delimiter=delimiter,
             success=(204, 404, 409))
             until=until,
             delimiter=delimiter,
             success=(204, 404, 409))
-        r.release()
         if r.status_code == 404:
             raise ClientError(
                 'Container "%s" does not exist' % self.container,
         if r.status_code == 404:
             raise ClientError(
                 'Container "%s" does not exist' % self.container,
@@ -754,6 +1033,7 @@ class PithosClient(PithosRestAPI):
             raise ClientError(
                 'Container "%s" is not empty' % self.container,
                 r.status_code)
             raise ClientError(
                 'Container "%s" is not empty' % self.container,
                 r.status_code)
+        return r.headers
 
     def get_container_versioning(self, container=None):
         """
 
     def get_container_versioning(self, container=None):
         """
@@ -770,7 +1050,7 @@ class PithosClient(PithosRestAPI):
         finally:
             self.container = cnt_back_up
 
         finally:
             self.container = cnt_back_up
 
-    def get_container_quota(self, container=None):
+    def get_container_limit(self, container=None):
         """
         :param container: (str)
 
         """
         :param container: (str)
 
@@ -826,28 +1106,30 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.container_post(update=True, metadata=metapairs)
         """
         assert(type(metapairs) is dict)
         r = self.container_post(update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_container_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
 
     def del_container_meta(self, metakey):
         """
         :param metakey: (str) metadatum key
+
+        :returns: (dict) response headers
         """
         r = self.container_post(update=True, metadata={metakey: ''})
         """
         r = self.container_post(update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
 
-    def set_container_quota(self, quota):
+    def set_container_limit(self, limit):
         """
         """
-        :param quota: (int)
+        :param limit: (int)
         """
         """
-        r = self.container_post(update=True, quota=quota)
-        r.release()
+        r = self.container_post(update=True, quota=limit)
+        return r.headers
 
     def set_container_versioning(self, versioning):
         """
         :param versioning: (str)
         """
         r = self.container_post(update=True, versioning=versioning)
 
     def set_container_versioning(self, versioning):
         """
         :param versioning: (str)
         """
         r = self.container_post(update=True, versioning=versioning)
-        r.release()
+        return r.headers
 
     def del_object(self, obj, until=None, delimiter=None):
         """
 
     def del_object(self, obj, until=None, delimiter=None):
         """
@@ -859,7 +1141,7 @@ class PithosClient(PithosRestAPI):
         """
         self._assert_container()
         r = self.object_delete(obj, until=until, delimiter=delimiter)
         """
         self._assert_container()
         r = self.object_delete(obj, until=until, delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def set_object_meta(self, obj, metapairs):
         """
 
     def set_object_meta(self, obj, metapairs):
         """
@@ -869,7 +1151,7 @@ class PithosClient(PithosRestAPI):
         """
         assert(type(metapairs) is dict)
         r = self.object_post(obj, update=True, metadata=metapairs)
         """
         assert(type(metapairs) is dict)
         r = self.object_post(obj, update=True, metadata=metapairs)
-        r.release()
+        return r.headers
 
     def del_object_meta(self, obj, metakey):
         """
 
     def del_object_meta(self, obj, metakey):
         """
@@ -878,7 +1160,7 @@ class PithosClient(PithosRestAPI):
         :param metakey: (str) metadatum key
         """
         r = self.object_post(obj, update=True, metadata={metakey: ''})
         :param metakey: (str) metadatum key
         """
         r = self.object_post(obj, update=True, metadata={metakey: ''})
-        r.release()
+        return r.headers
 
     def publish_object(self, obj):
         """
 
     def publish_object(self, obj):
         """
@@ -886,8 +1168,7 @@ class PithosClient(PithosRestAPI):
 
         :returns: (str) access url
         """
 
         :returns: (str) access url
         """
-        r = self.object_post(obj, update=True, public=True)
-        r.release()
+        self.object_post(obj, update=True, public=True)
         info = self.get_object_info(obj)
         pref, sep, rest = self.base_url.partition('//')
         base = rest.split('/')[0]
         info = self.get_object_info(obj)
         pref, sep, rest = self.base_url.partition('//')
         base = rest.split('/')[0]
@@ -898,7 +1179,7 @@ class PithosClient(PithosRestAPI):
         :param obj: (str) remote object path
         """
         r = self.object_post(obj, update=True, public=False)
         :param obj: (str) remote object path
         """
         r = self.object_post(obj, update=True, public=False)
-        r.release()
+        return r.headers
 
     def get_object_info(self, obj, version=None):
         """
 
     def get_object_info(self, obj, version=None):
         """
@@ -952,29 +1233,31 @@ class PithosClient(PithosRestAPI):
 
     def set_object_sharing(
             self, obj,
 
     def set_object_sharing(
             self, obj,
-            read_permition=False, write_permition=False):
+            read_permission=False, write_permission=False):
         """Give read/write permisions to an object.
 
         :param obj: (str) remote object path
 
         """Give read/write permisions to an object.
 
         :param obj: (str) remote object path
 
-        :param read_permition: (list - bool) users and user groups that get
-            read permition for this object - False means all previous read
+        :param read_permission: (list - bool) users and user groups that get
+            read permission for this object - False means all previous read
             permissions will be removed
 
             permissions will be removed
 
-        :param write_perimition: (list - bool) of users and user groups to get
-           write permition for this object - False means all previous write
+        :param write_permission: (list - bool) of users and user groups to get
+           write permission for this object - False means all previous write
            permissions will be removed
            permissions will be removed
+
+        :returns: (dict) response headers
         """
 
         """
 
-        perms = dict(read=read_permition or '', write=write_permition or '')
+        perms = dict(read=read_permission or '', write=write_permission or '')
         r = self.object_post(obj, update=True, permissions=perms)
         r = self.object_post(obj, update=True, permissions=perms)
-        r.release()
+        return r.headers
 
     def del_object_sharing(self, obj):
         """
         :param obj: (str) remote object path
         """
 
     def del_object_sharing(self, obj):
         """
         :param obj: (str) remote object path
         """
-        self.set_object_sharing(obj)
+        return self.set_object_sharing(obj)
 
     def append_object(self, obj, source_file, upload_cb=None):
         """
 
     def append_object(self, obj, source_file, upload_cb=None):
         """
@@ -984,36 +1267,62 @@ class PithosClient(PithosRestAPI):
 
         :param upload_db: progress.bar for uploading
         """
 
         :param upload_db: progress.bar for uploading
         """
-
         self._assert_container()
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
         nblocks = 1 + (filesize - 1) // blocksize
         offset = 0
         self._assert_container()
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
         nblocks = 1 + (filesize - 1) // blocksize
         offset = 0
+        headers = {}
         if upload_cb:
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
-        for i in range(nblocks):
-            block = source_file.read(min(blocksize, filesize - offset))
-            offset += len(block)
-            r = self.object_post(
-                obj,
-                update=True,
-                content_range='bytes */*',
-                content_type='application/octet-stream',
-                content_length=len(block),
-                data=block)
-            r.release()
-
-            if upload_cb:
-                upload_gen.next()
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        flying = {}
+        self._init_thread_limit()
+        try:
+            for i in range(nblocks):
+                block = source_file.read(min(blocksize, filesize - offset))
+                offset += len(block)
+
+                self._watch_thread_limit(flying.values())
+                unfinished = {}
+                flying[i] = SilentEvent(
+                    method=self.object_post,
+                    obj=obj,
+                    update=True,
+                    content_range='bytes */*',
+                    content_type='application/octet-stream',
+                    content_length=len(block),
+                    data=block)
+                flying[i].start()
+
+                for key, thread in flying.items():
+                    if thread.isAlive():
+                        if i < nblocks:
+                            unfinished[key] = thread
+                            continue
+                        thread.join()
+                    if thread.exception:
+                        raise thread.exception
+                    headers[key] = thread.value.headers
+                    self._cb_next()
+                flying = unfinished
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+        finally:
+            from time import sleep
+            sleep(2 * len(activethreads()))
+        return headers.values()
 
     def truncate_object(self, obj, upto_bytes):
         """
         :param obj: (str) remote object path
 
         :param upto_bytes: max number of bytes to leave on file
 
     def truncate_object(self, obj, upto_bytes):
         """
         :param obj: (str) remote object path
 
         :param upto_bytes: max number of bytes to leave on file
+
+        :returns: (dict) response headers
         """
         r = self.object_post(
             obj,
         """
         r = self.object_post(
             obj,
@@ -1022,7 +1331,7 @@ class PithosClient(PithosRestAPI):
             content_type='application/octet-stream',
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
             content_type='application/octet-stream',
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
-        r.release()
+        return r.headers
 
     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
         """Overwrite a part of an object from local source file
 
     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
         """Overwrite a part of an object from local source file
@@ -1056,8 +1365,9 @@ class PithosClient(PithosRestAPI):
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
-            upload_gen = upload_cb(nblocks)
-            upload_gen.next()
+            self.progress_bar_gen = upload_cb(nblocks)
+            self._cb_next()
+        headers = []
         for i in range(nblocks):
             read_size = min(blocksize, filesize - offset, datasize - offset)
             block = source_file.read(read_size)
         for i in range(nblocks):
             read_size = min(blocksize, filesize - offset, datasize - offset)
             block = source_file.read(read_size)
@@ -1070,11 +1380,11 @@ class PithosClient(PithosRestAPI):
                     start + offset,
                     start + offset + len(block) - 1),
                 data=block)
                     start + offset,
                     start + offset + len(block) - 1),
                 data=block)
+            headers.append(dict(r.headers))
             offset += len(block)
             offset += len(block)
-            r.release()
 
 
-            if upload_cb:
-                upload_gen.next()
+            self._cb_next
+        return headers
 
     def copy_object(
             self, src_container, src_object, dst_container,
 
     def copy_object(
             self, src_container, src_object, dst_container,
@@ -1102,6 +1412,8 @@ class PithosClient(PithosRestAPI):
         :param content_type: (str)
 
         :param delimiter: (str)
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
         """
         self._assert_account()
         self.container = dst_container
@@ -1116,7 +1428,7 @@ class PithosClient(PithosRestAPI):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
             public=public,
             content_type=content_type,
             delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def move_object(
             self, src_container, src_object, dst_container,
 
     def move_object(
             self, src_container, src_object, dst_container,
@@ -1144,6 +1456,8 @@ class PithosClient(PithosRestAPI):
         :param content_type: (str)
 
         :param delimiter: (str)
         :param content_type: (str)
 
         :param delimiter: (str)
+
+        :returns: (dict) response headers
         """
         self._assert_account()
         self.container = dst_container
         """
         self._assert_account()
         self.container = dst_container
@@ -1159,7 +1473,7 @@ class PithosClient(PithosRestAPI):
             public=public,
             content_type=content_type,
             delimiter=delimiter)
             public=public,
             content_type=content_type,
             delimiter=delimiter)
-        r.release()
+        return r.headers
 
     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
         """Get accounts that share with self.account
 
     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
         """Get accounts that share with self.account