self.object_put(obj, format='json', hashmap=True, content_type=content_type,
json=hashmap, success=201)
-
+
#download_* auxiliary methods
- def _get_object_block_info(self,obj, **kwargs):
+ def _get_remote_blocks_info(self, obj, **restargs):
#retrieve object hashmap
- hashmap = self.get_object_hashmap(obj, **kwargs)
+ hashmap = self.get_object_hashmap(obj, **restargs)
blocksize = int(hashmap['block_size'])
blockhash = hashmap['block_hash']
total_size = hashmap['bytes']
- hmap = hashmap['hashes']
+ print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize,
+ total_size/blocksize + 1, len(hashmap['hashes'])))
+ #assert total_size/blocksize + 1 == len(hashmap['hashes'])
map_dict = {}
- for h in hmap:
- map_dict[h] = True
- return (blocksize, blockhash, total_size, hmap, map_dict)
+ for i, h in enumerate(hashmap['hashes']):
+ map_dict[h] = i
+ return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
- def _get_range_limits(self, range):
- try:
- (custom_start, custom_end) = range.split('-')
- (custom_start, custom_end) = (int(custom_start), int(custom_end))
- except ValueError:
- raise ClientError(message='Invalid range string', status=601)
- if custom_start > custom_end or custom_start < 0:
- raise ClientError(message='Negative range', status=601)
- elif custom_start == custom_end:
- return
- elif custom_end > total_size:
- raise ClientError(message='Range exceeds file size', status=601)
- return (custom_start, custom_end)
-
- def _get_downloaded_blocks(self, hmap, fileobj, blocksize, blockhash, map_dict,
- overide=False, download_gen=None):
- if fileobj.isatty() or not path.exists(fileobj.name):
- return {}
- h = HashMap(blocksize, blockhash)
- with_progress_bar = False if download_gen is None else True
- h.load(fileobj, with_progress_bar)
- resumed = {}
- for i, x in enumerate(h):
- existing_hash = hexlify(x)
- if existing_hash in map_dict:
- #resume if some blocks have been downloaded
- resumed[existing_hash] = i
- if with_progress_bar:
- try:
- download_gen.next()
- except:
- pass
- elif not overide:
- raise ClientError(message='Local file is substantialy different',
- status=600)
- return resumed
-
- def _get_block_range(self, blockid, blocksize, total_size, custom_start, custom_end):
- start = blockid*blocksize
- if custom_start is not None:
- if start < custom_start:
- start = custom_start
- elif start > custom_end:
- return (None, None)
- end = start + blocksize -1 if start+blocksize < total_size else total_size -1
- if custom_end is not None and end > custom_end:
- end = custom_end
- return (start, end)
-
- def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0):
- newflying = []
- for v in flying:
- h = v['handler']
- if h.ready():
- if h.exception:
- h.release()
- raise h.exception
- try:
- block = h.value.content
- except AttributeError:
- #catch greenlets that break due to an eventlist bug
- print('- - - - - > Got a borken greenlet here')
- broken_greenlets.append(v)
- continue
- objfile.seek(v['start'])
- objfile.write(block)
- objfile.flush()
+ def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs):
+ for blockid, blockhash in enumerate(remote_hashes):
+ if blockhash == None:
+ continue
+ start = blocksize*blockid
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+ r = self.object_get(obj, success=(200, 206), **restargs)
+ self._cb_next()
+ dst.write(r.content)
+ dst.flush()
+
+ def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize,
+ blockhash):
+ #load file hashmap
+ file_hashmap = HashMap(blocksize, blockhash)
+ file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
+
+ #filter out blocks that are already downloaded
+ for i, x in enumerate(file_hashmap):
+ local_hash = hexlify(x)
+ if local_hash in remote_hashes:
+ blockid = remote_hashes.pop(local_hash)
+ hash_list[blockid] = None
+ self._cb_next()
else:
- #if there are unfinished greenlets, sleep for some time - be carefull with that
- sleep(sleeptime)
- newflying.append(v)
- return newflying
-
- def _get_block(self, obj, **kwargs):
- return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
+ raise ClientError(message='Local file is substantialy different', status=600)
- def _get_block_async(self, obj, **kwargs):
+ def _get_block_async(self, obj, **restargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
_stderr = sys._stderr
gevent.Greenlet._report_error(self, exc_info)
finally:
sys.stderr = _stderr
- POOL_SIZE =7
+ self.POOL_SIZE = 5
if self.async_pool is None:
- self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
- g = SilentGreenlet(self._get_block, obj, **kwargs)
+ self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
+ g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
self.async_pool.start(g)
return g
- def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
- download_gen=None, custom_start = None, custom_end=None, **restargs):
- """Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that
- is not possible retreat to sequensial block download
- """
-
- flying = []
- for i, h in enumerate(hmap):
- if h in resumed:
- continue
- if download_gen:
- try:
- download_gen.next()
- except StopIteration:
- pass
- (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
- if start is None:
- continue
- data_range = 'bytes=%s-%s'%(start, end)
- handler = self._get_block_async(obj, data_range=data_range, **restargs)
- flying.append({'handler':handler, 'start':start, 'data_range':data_range})
- broken = []
- flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken)
- #workaround for eventlib bug that breaks greenlets: replace them with new ones
- for brgr in broken:
- brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
- **restargs)
- flying.append(brgr)
-
- #write the last results and exit
- while len(flying) > 0:
- broken = []
- flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken,
- sleeptime=0.1)
- #workaround for eventlib bug that breaks greenlets: replace them with new ones
- for brgr in broken:
- brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'],
- **restargs)
- flying.append(brgr)
- objfile.truncate(total_size)
-
- gevent.joinall(flying)
-
- def _append_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size,
- download_gen=None, custom_start=None, custom_end=None, **restargs):
- for i, h in enumerate(hmap):
- if h in resumed:
- continue
- if download_gen:
+ def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
+ finished = []
+ for start, g in flying_greenlets.items():
+ if g.ready():
+ if g.exception:
+ g.release()
+ raise g.exception
try:
- download_gen.next()
- except StopIteration:
- pass
- (start, end) = self._get_block_range(i, blocksize, total_size, custom_start, custom_end)
- data_range = 'bytes=%s-%s'%(start, end)
- r = self._get_block(obj, data_range=data_range, **restargs)
- objfile.write(r.content)
- objfile.flush()
-
- def download_object(self, obj, objfile, download_cb=None, version=None, overide=False, range=None,
- if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
- """overide is forcing the local file to become exactly as the remote, even if it is
- substantialy different
- """
-
- self.assert_container()
-
- (blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj,
- version=version, if_match=if_match, if_none_match=if_none_match,
- if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
-
- if total_size <= 0:
- return
-
- (custom_start, custom_end) = (None, None) if range is None \
- else self._get_range_limits(range)
-
- #load progress bar
- if download_cb is not None:
- download_gen = download_cb(total_size/blocksize + 1)
- download_gen.next()
-
- resumed = self._get_downloaded_blocks(hmap, objfile, blocksize, blockhash, map_dict,
- overide=overide, download_gen=download_gen)
- restargs=dict(version=version, if_etag_match=if_match, if_etag_not_match=if_none_match,
- if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
-
- if objfile.isatty():
- self._append_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
- download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
+ block = g.value.content
+ except AttributeError:
+ broken[start] = flying_greenlets.pop(start)
+ continue
+ local_file.seek(start)
+ local_file.write(block)
+ #local_file.flush()
+ self._cb_next()
+ finished.append(flying_greenlets.pop(start))
+ local_file.flush()
+ return finished
+
+ def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
+
+ #let the fly
+ flying_greenlets = {}
+ finished_greenlets = []
+ broken = {}
+ for block_hash, blockid in remote_hashes.items():
+ start = blocksize*blockid
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+ #store info for relaunching greenlet if needed
+ flying_greenlets[start] = self._get_block_async(obj, **restargs)
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
+ **restargs)
+
+ #check the greenlets
+ while len(flying_greenlets) > 0:
+ sleep(0.1)
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
+ **restargs)
+
+ gevent.joinall(finished_greenlets)
+
+
+ def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
+ range=None, if_match=None, if_none_match=None, if_modified_since=None,
+ if_unmodified_since=None):
+
+ #init REST api args
+ restargs=dict(version=version,
+ data_range = None if range is None else 'bytes=%s'%range,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ #1. get remote object hash info
+ ( blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(remote_hashes)+1)
+ self._cb_next()
+
+ if dst.isatty():
+ self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
+ elif resume:
+ self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash)
+ self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
else:
- self._async_download_missing_blocks(obj, objfile, hmap, resumed, blocksize, total_size,
- download_gen, custom_start=custom_start, custom_end=custom_end, **restargs)
+ self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs)
+ dst.truncate(total_size)
+
+ self._complete_cb()
+ #Command Progress Bar method
+ def _cb_next(self):
+ if hasattr(self, 'progress_bar_gen'):
+ try:
+ self.progress_bar_gen.next()
+ except:
+ pass
+ def _complete_cb(self):
+ while True:
+ try:
+ self.progress_bar_gen.next()
+ except:
+ break
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
- if_modified_since=None, if_unmodified_since=None):
+ if_modified_since=None, if_unmodified_since=None, data_range=None):
try:
r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
- if_unmodified_since=if_unmodified_since)
+ if_unmodified_since=if_unmodified_since, data_range=data_range)
except ClientError as err:
-
if err.status == 304 or err.status == 412:
return {}
raise