Optimize download_to_string by using threads
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 16 May 2013 14:17:58 +0000 (17:17 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 16 May 2013 14:17:58 +0000 (17:17 +0300)
Refs: #3608

kamaki/clients/pithos/__init__.py

index 87c7103..74dd913 100644 (file)
@@ -494,7 +494,7 @@ class PithosClient(PithosRestClient):
         - e.g. if the range is 10-100, all blocks will be written to
         normal_position - 10
         """
-        for i, (key, g) in enumerate(flying.items()):
+        for key, g in flying.items():
             if g.isAlive():
                 continue
             if g.exception:
@@ -532,8 +532,8 @@ class PithosClient(PithosRestClient):
                 self._thread2file(
                     flying, blockid_dict, local_file, offset,
                     **restargs)
-                end = total_size - 1 if key + blocksize > total_size\
-                    else key + blocksize - 1
+                end = total_size - 1 if (
+                    key + blocksize > total_size) else key + blocksize - 1
                 start, end = _range_up(key, end, filerange)
                 if start == end:
                     self._cb_next()
@@ -632,7 +632,8 @@ class PithosClient(PithosRestClient):
             if_none_match=None,
             if_modified_since=None,
             if_unmodified_since=None):
-        """Download an object to a string (multiple connections)
+        """Download an object to a string (multiple connections). This method
+        uses threads for http requests, but stores all content in memory.
 
         :param obj: (str) remote object path
 
@@ -672,21 +673,29 @@ class PithosClient(PithosRestClient):
             self.progress_bar_gen = download_cb(len(hash_list))
             self._cb_next()
 
-        ret = ''
+        num_of_blocks = len(remote_hashes)
+        ret = [''] * num_of_blocks
+        self._init_thread_limit()
+        flying = dict()
         for blockid, blockhash in enumerate(remote_hashes):
             start = blocksize * blockid
             is_last = start + blocksize > total_size
             end = (total_size - 1) if is_last else (start + blocksize - 1)
             (start, end) = _range_up(start, end, range_str)
-            if start == end:
-                continue
-            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
-            r = self.object_get(obj, success=(200, 206), **restargs)
-            ret += r.content
-            self._cb_next()
-
-        self._complete_cb()
-        return ret
+            if start < end:
+                self._watch_thread_limit(flying.values())
+                flying[blockid] = self._get_block_async(obj, **restargs)
+            for runid, thread in flying.items():
+                if (blockid + 1) == num_of_blocks:
+                    thread.join()
+                elif thread.isAlive():
+                    continue
+                if thread.exception:
+                    raise thread.exception
+                ret[runid] = thread.value.content
+                self._cb_next()
+                flying.pop(runid)
+        return ''.join(ret)
 
     #Command Progress Bar method
     def _cb_next(self, step=1):