Refactor pithos+ download
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Fri, 14 Sep 2012 12:50:47 +0000 (15:50 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Fri, 14 Sep 2012 12:50:47 +0000 (15:50 +0300)
3-modes:
a sequential
b sequential with resume
c asynchronous/parallel
bug: Still can't multi-download correctly

kamaki/cli/commands/pithos_cli.py
kamaki/clients/pithos.py

index bc5449e..f88c0e0 100644 (file)
@@ -567,8 +567,8 @@ class store_download(_store_container_command):
         super(self.__class__, self).update_parser(parser)
         parser.add_argument('--no-progress-bar', action='store_true', dest='no_progress_bar',
             default=False, help='Dont display progress bars')
-        parser.add_argument('--overide', action='store_true', dest='overide', default=False,
-            help='Force download to overide an existing file')
+        parser.add_argument('--resume', action='store_true', dest='resume', default=False,
+            help='Enable download resume (slower)')
         parser.add_argument('--range', action='store', dest='range', default=None,
             help='show range of data')
         parser.add_argument('--if-match', action='store', dest='if_match', default=None,
@@ -591,10 +591,10 @@ class store_download(_store_container_command):
             out = stdout
         else:
             try:
-                if getattr(self.args, 'overide'):
-                    out = open(local_path, 'wb+')
-                else:
+                if getattr(self.args, 'resume'):
                     out = open(local_path, 'ab+')
+                else:
+                    out = open(local_path, 'wb+')
             except IOError as err:
                 raise CLIError(message='Cannot write to file %s - %s'%(local_path,unicode(err)),
                     importance=1)
@@ -605,7 +605,7 @@ class store_download(_store_container_command):
         try:
             self.client.download_object(self.path, out, download_cb,
                 range=getattr(self.args, 'range'), version=getattr(self.args,'object_version'),
-                if_match=getattr(self.args, 'if_match'), overide=getattr(self.args, 'overide'),
+                if_match=getattr(self.args, 'if_match'), resume=getattr(self.args, 'resume'),
                 if_none_match=getattr(self.args, 'if_none_match'),
                 if_modified_since=getattr(self.args, 'if_modified_since'),
                 if_unmodified_since=getattr(self.args, 'if_unmodified_since'))
index 6eb34ba..6650905 100644 (file)
@@ -210,97 +210,51 @@ class PithosClient(PithosRestAPI):
 
         self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
             json=hashmap, success=201)
-      
+    
     #download_* auxiliary methods
-    def _get_object_block_info(self,obj, **kwargs):
+    def _get_remote_blocks_info(self, obj, **restargs):
         #retrieve object hashmap
-        hashmap = self.get_object_hashmap(obj, **kwargs)
+        hashmap = self.get_object_hashmap(obj, **restargs)
         blocksize = int(hashmap['block_size'])
         blockhash = hashmap['block_hash']
         total_size = hashmap['bytes']
-        hmap = hashmap['hashes']
+        print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize,
+            total_size/blocksize + 1, len(hashmap['hashes'])))
+        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
         map_dict = {}
-        for h in hmap:
-            map_dict[h] = True
-        return (blocksize, blockhash, total_size, hmap, map_dict)
+        for i, h in enumerate(hashmap['hashes']):
+            map_dict[h] = i
+        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
-    def _get_range_limits(self, range):
-        try:
-            (custom_start, custom_end) = range.split('-')
-            (custom_start, custom_end) = (int(custom_start), int(custom_end))
-        except ValueError:
-            raise ClientError(message='Invalid range string', status=601)
-        if custom_start > custom_end or custom_start < 0:
-            raise ClientError(message='Negative range', status=601)
-        elif custom_start == custom_end:
-            return
-        elif custom_end > total_size:
-            raise ClientError(message='Range exceeds file size', status=601)
-        return (custom_start, custom_end)
-
-    def _get_downloaded_blocks(self, hmap, fileobj, blocksize, blockhash, map_dict,
-        overide=False, download_gen=None):
-        if fileobj.isatty() or not path.exists(fileobj.name):
-            return {}
-        h = HashMap(blocksize, blockhash)
-        with_progress_bar = False if download_gen is None else True
-        h.load(fileobj, with_progress_bar)
-        resumed = {}
-        for i, x in enumerate(h):
-            existing_hash = hexlify(x)
-            if existing_hash in map_dict:
-        #resume if some blocks have been downloaded
-                resumed[existing_hash] = i
-                if with_progress_bar:
-                    try:
-                        download_gen.next()
-                    except:
-                        pass
-            elif not overide:
-                raise ClientError(message='Local file is substantialy different',
-                    status=600)
-        return resumed
-
-    def _get_block_range(self, blockid, blocksize, total_size, custom_start, custom_end):
-        start = blockid*blocksize
-        if custom_start is not None:
-            if start < custom_start:
-                start = custom_start
-            elif start > custom_end:
-                return (None, None)
-        end = start + blocksize -1 if start+blocksize < total_size else total_size -1
-        if custom_end is not None and end > custom_end:
-            end = custom_end
-        return (start, end)
-
-    def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0):
-        newflying = []
-        for v in flying:
-            h = v['handler']
-            if h.ready():
-                if h.exception:
-                    h.release()
-                    raise h.exception
-                try:
-                    block = h.value.content
-                except AttributeError:
-                    #catch greenlets that break due to an eventlist bug
-                    print('- - - - - > Got a borken greenlet here')
-                    broken_greenlets.append(v)
-                    continue
-                objfile.seek(v['start'])
-                objfile.write(block)
-                objfile.flush()
+    def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs):
+        for blockid, blockhash in enumerate(remote_hashes):
+            if blockhash == None:
+                continue
+            start = blocksize*blockid
+            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+            r = self.object_get(obj, success=(200, 206), **restargs)
+            self._cb_next()
+            dst.write(r.content)
+            dst.flush()
+
+    def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize,
+        blockhash):
+        #load file hashmap
+        file_hashmap = HashMap(blocksize, blockhash)
+        file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
+
+        #filter out blocks that are already downloaded
+        for i, x in enumerate(file_hashmap):
+            local_hash = hexlify(x)
+            if local_hash in remote_hashes:
+                blockid = remote_hashes.pop(local_hash)
+                hash_list[blockid] = None
+                self._cb_next()
             else:
-                #if there are unfinished greenlets, sleep for some time - be carefull with that
-                sleep(sleeptime)
-                newflying.append(v)
-        return newflying
-
-    def _get_block(self, obj, **kwargs):
-        return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
+                raise ClientError(message='Local file is substantialy different', status=600)
 
-    def _get_block_async(self, obj, **kwargs):
+    def _get_block_async(self, obj, **restargs):
         class SilentGreenlet(gevent.Greenlet):
             def _report_error(self, exc_info):
                 _stderr = sys._stderr
@@ -309,116 +263,113 @@ class PithosClient(PithosRestAPI):
                     gevent.Greenlet._report_error(self, exc_info)
                 finally:
                     sys.stderr = _stderr
-        POOL_SIZE =7
+        self.POOL_SIZE = 5
         if self.async_pool is None:
-            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
-        g = SilentGreenlet(self._get_block, obj, **kwargs)
+            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
+        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
         self.async_pool.start(g)
         return g
 
-    def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size, 
-        download_gen=None, custom_start = None, custom_end=None, **restargs):
-        """Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that
-        is not possible retreat to sequensial block download
-        """
-
-        flying = []
-        for i, h in enumerate(hmap):
-            if h in resumed:
-                continue
-            if download_gen:
-                try:
-                    download_gen.next()
-                except StopIteration:
-                    pass
-            (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
-            if start is None:
-                continue
-            data_range = 'bytes=%s-%s'%(start, end)
-            handler = self._get_block_async(obj, data_range=data_range, **restargs)
-            flying.append({'handler':handler, 'start':start, 'data_range':data_range})
-            broken = []
-            flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken)
-            #workaround for eventlib bug that breaks greenlets: replace them with new ones
-            for brgr in broken:
-                brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
-                    **restargs)
-                flying.append(brgr)
-                               
-        #write the last results and exit
-        while len(flying) > 0:
-            broken = []
-            flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken,
-                sleeptime=0.1)
-            #workaround for eventlib bug that breaks greenlets: replace them with new ones
-            for brgr in broken:
-                brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
-                    **restargs)
-                flying.append(brgr)
-        objfile.truncate(total_size)
-
-        gevent.joinall(flying)
-
-    def _append_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
-        download_gen=None, custom_start=None, custom_end=None, **restargs):
-        for i, h in enumerate(hmap):
-            if h in resumed:
-                continue
-            if download_gen:
+    def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
+        finished = []
+        for start, g in flying_greenlets.items():
+            if g.ready():
+                if g.exception:
+                    g.release()
+                    raise g.exception
                 try:
-                    download_gen.next()
-                except StopIteration:
-                    pass
-            (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
-            data_range = 'bytes=%s-%s'%(start, end)
-            r = self._get_block(obj, data_range=data_range, **restargs)
-            objfile.write(r.content)
-            objfile.flush() 
-
-    def download_object(self, obj, objfile, download_cb=None, version=None, overide=False, range=None,
-        if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
-        """overide is forcing the local file to become exactly as the remote, even if it is
-        substantialy different
-        """
-
-        self.assert_container()
-
-        (blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj,
-            version=version, if_match=if_match, if_none_match=if_none_match,
-            if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
-
-        if total_size <= 0:
-            return
-
-        (custom_start, custom_end) = (None, None) if range is None \
-            else self._get_range_limits(range)
-
-        #load progress bar
-        if download_cb is not None:
-            download_gen = download_cb(total_size/blocksize + 1)
-            download_gen.next()
-
-        resumed = self._get_downloaded_blocks(hmap, objfile, blocksize, blockhash, map_dict,
-            overide=overide, download_gen=download_gen)
-        restargs=dict(version=version, if_etag_match=if_match, if_etag_not_match=if_none_match,
-            if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
-
-        if objfile.isatty():
-            self._append_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
-                download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
+                    block = g.value.content
+                except AttributeError:
+                    broken[start] = flying_greenlets.pop(start)
+                    continue
+                local_file.seek(start)
+                local_file.write(block)
+                #local_file.flush()
+                self._cb_next()
+                finished.append(flying_greenlets.pop(start))
+        local_file.flush()
+        return finished
+
+    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
+
+        #let the fly
+        flying_greenlets = {}
+        finished_greenlets = []
+        broken = {}
+        for block_hash, blockid in remote_hashes.items():
+            start = blocksize*blockid
+            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+            #store info for relaunching greenlet if needed
+            flying_greenlets[start] = self._get_block_async(obj, **restargs)
+            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
+                **restargs)
+
+        #check the greenlets
+        while len(flying_greenlets) > 0:
+            sleep(0.1)
+            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
+                **restargs)
+
+        gevent.joinall(finished_greenlets)
+
+
+    def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
+        range=None, if_match=None, if_none_match=None, if_modified_since=None,
+        if_unmodified_since=None):
+
+        #init REST api args
+        restargs=dict(version=version,
+            data_range = None if range is None else 'bytes=%s'%range,
+            if_match=if_match,
+            if_none_match=if_none_match,
+            if_modified_since=if_modified_since,
+            if_unmodified_since=if_unmodified_since)
+
+        #1. get remote object hash info
+        (   blocksize,
+            blockhash,
+            total_size,
+            hash_list, 
+            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+        assert total_size >= 0
+
+        if download_cb:
+            self.progress_bar_gen = download_cb(len(remote_hashes)+1)
+            self._cb_next()
+
+        if dst.isatty():
+            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
+        elif resume:
+            self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash)
+            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
         else:
-            self._async_download_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
-                download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
+            self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs)
+            dst.truncate(total_size)
+
+        self._complete_cb()
 
+    #Command Progress Bar method
+    def _cb_next(self):
+        if hasattr(self, 'progress_bar_gen'):
+            try:
+                self.progress_bar_gen.next()
+            except:
+                pass
+    def _complete_cb(self):
+        while True:
+            try:
+                self.progress_bar_gen.next()
+            except:
+                break
 
     def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
-        if_modified_since=None, if_unmodified_since=None):
+        if_modified_since=None, if_unmodified_since=None, data_range=None):
         try:
             r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
                 if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
-                if_unmodified_since=if_unmodified_since)
+                if_unmodified_since=if_unmodified_since, data_range=data_range)
         except ClientError as err:
-            
             if err.status == 304 or err.status == 412:
                 return {}
             raise