From: Stavros Sachtouris Date: Fri, 14 Sep 2012 12:50:47 +0000 (+0300) Subject: Refactor pithos+ download X-Git-Tag: v0.6~135 X-Git-Url: https://code.grnet.gr/git/kamaki/commitdiff_plain/fbfee225e20579d435ad145bea0d48aaa0c43853 Refactor pithos+ download 3-modes: a sequential b sequential with resume c asynchronous/parallel bug: Still can't multi-download correctly --- diff --git a/kamaki/cli/commands/pithos_cli.py b/kamaki/cli/commands/pithos_cli.py index bc5449e..f88c0e0 100644 --- a/kamaki/cli/commands/pithos_cli.py +++ b/kamaki/cli/commands/pithos_cli.py @@ -567,8 +567,8 @@ class store_download(_store_container_command): super(self.__class__, self).update_parser(parser) parser.add_argument('--no-progress-bar', action='store_true', dest='no_progress_bar', default=False, help='Dont display progress bars') - parser.add_argument('--overide', action='store_true', dest='overide', default=False, - help='Force download to overide an existing file') + parser.add_argument('--resume', action='store_true', dest='resume', default=False, + help='Enable download resume (slower)') parser.add_argument('--range', action='store', dest='range', default=None, help='show range of data') parser.add_argument('--if-match', action='store', dest='if_match', default=None, @@ -591,10 +591,10 @@ class store_download(_store_container_command): out = stdout else: try: - if getattr(self.args, 'overide'): - out = open(local_path, 'wb+') - else: + if getattr(self.args, 'resume'): out = open(local_path, 'ab+') + else: + out = open(local_path, 'wb+') except IOError as err: raise CLIError(message='Cannot write to file %s - %s'%(local_path,unicode(err)), importance=1) @@ -605,7 +605,7 @@ class store_download(_store_container_command): try: self.client.download_object(self.path, out, download_cb, range=getattr(self.args, 'range'), version=getattr(self.args,'object_version'), - if_match=getattr(self.args, 'if_match'), overide=getattr(self.args, 'overide'), + if_match=getattr(self.args, 'if_match'), resume=getattr(self.args, 'resume'), if_none_match=getattr(self.args, 'if_none_match'), if_modified_since=getattr(self.args, 'if_modified_since'), if_unmodified_since=getattr(self.args, 'if_unmodified_since')) diff --git a/kamaki/clients/pithos.py b/kamaki/clients/pithos.py index 6eb34ba..6650905 100644 --- a/kamaki/clients/pithos.py +++ b/kamaki/clients/pithos.py @@ -210,97 +210,51 @@ class PithosClient(PithosRestAPI): self.object_put(obj, format='json', hashmap=True, content_type=content_type, json=hashmap, success=201) - + #download_* auxiliary methods - def _get_object_block_info(self,obj, **kwargs): + def _get_remote_blocks_info(self, obj, **restargs): #retrieve object hashmap - hashmap = self.get_object_hashmap(obj, **kwargs) + hashmap = self.get_object_hashmap(obj, **restargs) blocksize = int(hashmap['block_size']) blockhash = hashmap['block_hash'] total_size = hashmap['bytes'] - hmap = hashmap['hashes'] + print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize, + total_size/blocksize + 1, len(hashmap['hashes']))) + #assert total_size/blocksize + 1 == len(hashmap['hashes']) map_dict = {} - for h in hmap: - map_dict[h] = True - return (blocksize, blockhash, total_size, hmap, map_dict) + for i, h in enumerate(hashmap['hashes']): + map_dict[h] = i + return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict) - def _get_range_limits(self, range): - try: - (custom_start, custom_end) = range.split('-') - (custom_start, custom_end) = (int(custom_start), int(custom_end)) - except ValueError: - raise ClientError(message='Invalid range string', status=601) - if custom_start > custom_end or custom_start < 0: - raise ClientError(message='Negative range', status=601) - elif custom_start == custom_end: - return - elif custom_end > total_size: - raise ClientError(message='Range exceeds file size', status=601) - return (custom_start, custom_end) - - def _get_downloaded_blocks(self, hmap, fileobj, blocksize, blockhash, map_dict, - overide=False, download_gen=None): - if fileobj.isatty() or not path.exists(fileobj.name): - return {} - h = HashMap(blocksize, blockhash) - with_progress_bar = False if download_gen is None else True - h.load(fileobj, with_progress_bar) - resumed = {} - for i, x in enumerate(h): - existing_hash = hexlify(x) - if existing_hash in map_dict: - #resume if some blocks have been downloaded - resumed[existing_hash] = i - if with_progress_bar: - try: - download_gen.next() - except: - pass - elif not overide: - raise ClientError(message='Local file is substantialy different', - status=600) - return resumed - - def _get_block_range(self, blockid, blocksize, total_size, custom_start, custom_end): - start = blockid*blocksize - if custom_start is not None: - if start < custom_start: - start = custom_start - elif start > custom_end: - return (None, None) - end = start + blocksize -1 if start+blocksize < total_size else total_size -1 - if custom_end is not None and end > custom_end: - end = custom_end - return (start, end) - - def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0): - newflying = [] - for v in flying: - h = v['handler'] - if h.ready(): - if h.exception: - h.release() - raise h.exception - try: - block = h.value.content - except AttributeError: - #catch greenlets that break due to an eventlist bug - print('- - - - - > Got a borken greenlet here') - broken_greenlets.append(v) - continue - objfile.seek(v['start']) - objfile.write(block) - objfile.flush() + def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs): + for blockid, blockhash in enumerate(remote_hashes): + if blockhash == None: + continue + start = blocksize*blockid + end = total_size-1 if start+blocksize > total_size else start+blocksize-1 + restargs['data_range'] = 'bytes=%s-%s'%(start, end) + r = self.object_get(obj, success=(200, 206), **restargs) + self._cb_next() + dst.write(r.content) + dst.flush() + + def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize, + blockhash): + #load file hashmap + file_hashmap = HashMap(blocksize, blockhash) + file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen')) + + #filter out blocks that are already downloaded + for i, x in enumerate(file_hashmap): + local_hash = hexlify(x) + if local_hash in remote_hashes: + blockid = remote_hashes.pop(local_hash) + hash_list[blockid] = None + self._cb_next() else: - #if there are unfinished greenlets, sleep for some time - be carefull with that - sleep(sleeptime) - newflying.append(v) - return newflying - - def _get_block(self, obj, **kwargs): - return self.object_get(obj, success=(200, 206), binary=True, **kwargs) + raise ClientError(message='Local file is substantialy different', status=600) - def _get_block_async(self, obj, **kwargs): + def _get_block_async(self, obj, **restargs): class SilentGreenlet(gevent.Greenlet): def _report_error(self, exc_info): _stderr = sys._stderr @@ -309,116 +263,113 @@ class PithosClient(PithosRestAPI): gevent.Greenlet._report_error(self, exc_info) finally: sys.stderr = _stderr - POOL_SIZE =7 + self.POOL_SIZE = 5 if self.async_pool is None: - self.async_pool = gevent.pool.Pool(size=POOL_SIZE) - g = SilentGreenlet(self._get_block, obj, **kwargs) + self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE) + g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs) self.async_pool.start(g) return g - def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size, - download_gen=None, custom_start = None, custom_end=None, **restargs): - """Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that - is not possible retreat to sequensial block download - """ - - flying = [] - for i, h in enumerate(hmap): - if h in resumed: - continue - if download_gen: - try: - download_gen.next() - except StopIteration: - pass - (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end) - if start is None: - continue - data_range = 'bytes=%s-%s'%(start, end) - handler = self._get_block_async(obj, data_range=data_range, **restargs) - flying.append({'handler':handler, 'start':start, 'data_range':data_range}) - broken = [] - flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken) - #workaround for eventlib bug that breaks greenlets: replace them with new ones - for brgr in broken: - brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'], - **restargs) - flying.append(brgr) - - #write the last results and exit - while len(flying) > 0: - broken = [] - flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken, - sleeptime=0.1) - #workaround for eventlib bug that breaks greenlets: replace them with new ones - for brgr in broken: - brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'], - **restargs) - flying.append(brgr) - objfile.truncate(total_size) - - gevent.joinall(flying) - - def _append_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size, - download_gen=None, custom_start=None, custom_end=None, **restargs): - for i, h in enumerate(hmap): - if h in resumed: - continue - if download_gen: + def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs): + finished = [] + for start, g in flying_greenlets.items(): + if g.ready(): + if g.exception: + g.release() + raise g.exception try: - download_gen.next() - except StopIteration: - pass - (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end) - data_range = 'bytes=%s-%s'%(start, end) - r = self._get_block(obj, data_range=data_range, **restargs) - objfile.write(r.content) - objfile.flush() - - def download_object(self, obj, objfile, download_cb=None, version=None, overide=False, range=None, - if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None): - """overide is forcing the local file to become exactly as the remote, even if it is - substantialy different - """ - - self.assert_container() - - (blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj, - version=version, if_match=if_match, if_none_match=if_none_match, - if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since) - - if total_size <= 0: - return - - (custom_start, custom_end) = (None, None) if range is None \ - else self._get_range_limits(range) - - #load progress bar - if download_cb is not None: - download_gen = download_cb(total_size/blocksize + 1) - download_gen.next() - - resumed = self._get_downloaded_blocks(hmap, objfile, blocksize, blockhash, map_dict, - overide=overide, download_gen=download_gen) - restargs=dict(version=version, if_etag_match=if_match, if_etag_not_match=if_none_match, - if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since) - - if objfile.isatty(): - self._append_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size, - download_gen, custom_start=custom_start, custom_end=custom_end, **restargs) + block = g.value.content + except AttributeError: + broken[start] = flying_greenlets.pop(start) + continue + local_file.seek(start) + local_file.write(block) + #local_file.flush() + self._cb_next() + finished.append(flying_greenlets.pop(start)) + local_file.flush() + return finished + + def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs): + + #let the fly + flying_greenlets = {} + finished_greenlets = [] + broken = {} + for block_hash, blockid in remote_hashes.items(): + start = blocksize*blockid + end = total_size-1 if start+blocksize > total_size else start+blocksize-1 + restargs['data_range'] = 'bytes=%s-%s'%(start, end) + #store info for relaunching greenlet if needed + flying_greenlets[start] = self._get_block_async(obj, **restargs) + finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken, + **restargs) + + #check the greenlets + while len(flying_greenlets) > 0: + sleep(0.1) + finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken, + **restargs) + + gevent.joinall(finished_greenlets) + + + def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False, + range=None, if_match=None, if_none_match=None, if_modified_since=None, + if_unmodified_since=None): + + #init REST api args + restargs=dict(version=version, + data_range = None if range is None else 'bytes=%s'%range, + if_match=if_match, + if_none_match=if_none_match, + if_modified_since=if_modified_since, + if_unmodified_since=if_unmodified_since) + + #1. get remote object hash info + ( blocksize, + blockhash, + total_size, + hash_list, + remote_hashes) = self._get_remote_blocks_info(obj, **restargs) + assert total_size >= 0 + + if download_cb: + self.progress_bar_gen = download_cb(len(remote_hashes)+1) + self._cb_next() + + if dst.isatty(): + self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs) + elif resume: + self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash) + self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs) else: - self._async_download_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size, - download_gen, custom_start=custom_start, custom_end=custom_end, **restargs) + self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs) + dst.truncate(total_size) + + self._complete_cb() + #Command Progress Bar method + def _cb_next(self): + if hasattr(self, 'progress_bar_gen'): + try: + self.progress_bar_gen.next() + except: + pass + def _complete_cb(self): + while True: + try: + self.progress_bar_gen.next() + except: + break def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None, - if_modified_since=None, if_unmodified_since=None): + if_modified_since=None, if_unmodified_since=None, data_range=None): try: r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match, if_etag_not_match=if_none_match, if_modified_since=if_modified_since, - if_unmodified_since=if_unmodified_since) + if_unmodified_since=if_unmodified_since, data_range=data_range) except ClientError as err: - if err.status == 304 or err.status == 412: return {} raise