Minimize requeests whn dnlding same block
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Mon, 8 Apr 2013 15:50:01 +0000 (18:50 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Mon, 8 Apr 2013 15:50:01 +0000 (18:50 +0300)
If a file has multiple same blocks, download one of them and copy it to local
file locations

kamaki/clients/pithos/__init__.py

index 4298629..2b7f69a 100644 (file)
@@ -442,32 +442,33 @@ class PithosClient(PithosRestClient):
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
 
-    def _thread2file(self, flying, local_file, offset=0, **restargs):
+    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
         """write the results of a greenleted rest call to a file
 
         :param offset: the offset of the file up to blocksize
         - e.g. if the range is 10-100, all blocks will be written to
         normal_position - 10
         """
-        finished = []
-        for i, (start, g) in enumerate(flying.items()):
-            if not g.isAlive():
-                if g.exception:
-                    raise g.exception
-                block = g.value.content
-                local_file.seek(start - offset)
+        for i, (key, g) in enumerate(flying.items()):
+            if g.isAlive():
+                continue
+            if g.exception:
+                raise g.exception
+            block = g.value.content
+            for block_start in blockids[key]:
+                local_file.seek(block_start + offset)
                 local_file.write(block)
                 self._cb_next()
-                finished.append(flying.pop(start))
+            flying.pop(key)
+            blockids.pop(key)
         local_file.flush()
-        return finished
 
     def _dump_blocks_async(
             self, obj, remote_hashes, blocksize, total_size, local_file,
             blockhash=None, resume=False, filerange=None, **restargs):
         file_size = fstat(local_file.fileno()).st_size if resume else 0
-        flying = {}
-        finished = []
+        flying = dict()
+        blockid_dict = dict()
         offset = 0
         if filerange is not None:
             rstart = int(filerange.split('-')[0])
@@ -475,31 +476,31 @@ class PithosClient(PithosRestClient):
 
         self._init_thread_limit()
         for block_hash, blockids in remote_hashes.items():
-            for blockid in blockids:
-                start = blocksize * blockid
-                if start < file_size and block_hash == self._hash_from_file(
-                        local_file, start, blocksize, blockhash):
-                    self._cb_next()
-                    continue
+            blockids = [blk * blocksize for blk in blockids]
+            unsaved = [blk for blk in blockids if not (
+                blk < file_size and block_hash == self._hash_from_file(
+                        local_file, blk, blocksize, blockhash))]
+            self._cb_next(len(blockids) - len(unsaved))
+            if unsaved:
+                key = unsaved[0]
                 self._watch_thread_limit(flying.values())
-                finished += self._thread2file(
-                    flying,
-                    local_file,
-                    offset,
+                self._thread2file(
+                    flying, blockid_dict, local_file, offset,
                     **restargs)
-                end = total_size - 1 if start + blocksize > total_size\
-                    else start + blocksize - 1
-                (start, end) = _range_up(start, end, filerange)
+                end = total_size - 1 if key + blocksize > total_size\
+                    else key + blocksize - 1
+                start, end = _range_up(key, end, filerange)
                 if start == end:
                     self._cb_next()
                     continue
                 restargs['async_headers'] = {
                     'Range': 'bytes=%s-%s' % (start, end)}
-                flying[start] = self._get_block_async(obj, **restargs)
+                flying[key] = self._get_block_async(obj, **restargs)
+                blockid_dict[key] = unsaved
 
         for thread in flying.values():
             thread.join()
-        finished += self._thread2file(flying, local_file, offset, **restargs)
+        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
 
     def download_object(
             self, obj, dst,
@@ -578,17 +579,17 @@ class PithosClient(PithosRestClient):
         self._complete_cb()
 
     #Command Progress Bar method
-    def _cb_next(self):
+    def _cb_next(self, step=1):
         if hasattr(self, 'progress_bar_gen'):
             try:
-                self.progress_bar_gen.next()
+                self.progress_bar_gen.next(step)
             except:
                 pass
 
     def _complete_cb(self):
         while True:
             try:
-                self.progress_bar_gen.next()
+                self.progress_bar_gen.next(step)
             except:
                 break