h.update(block.strip('\x00'))
return hexlify(h.digest())
- def _thread2file(self, flying, local_file, offset=0, **restargs):
+ def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
"""write the results of a greenleted rest call to a file
:param offset: the offset of the file up to blocksize
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
- finished = []
- for i, (start, g) in enumerate(flying.items()):
- if not g.isAlive():
- if g.exception:
- raise g.exception
- block = g.value.content
- local_file.seek(start - offset)
+ for i, (key, g) in enumerate(flying.items()):
+ if g.isAlive():
+ continue
+ if g.exception:
+ raise g.exception
+ block = g.value.content
+ for block_start in blockids[key]:
+ local_file.seek(block_start + offset)
local_file.write(block)
self._cb_next()
- finished.append(flying.pop(start))
+ flying.pop(key)
+ blockids.pop(key)
local_file.flush()
- return finished
def _dump_blocks_async(
self, obj, remote_hashes, blocksize, total_size, local_file,
blockhash=None, resume=False, filerange=None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
- flying = {}
- finished = []
+ flying = dict()
+ blockid_dict = dict()
offset = 0
if filerange is not None:
rstart = int(filerange.split('-')[0])
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
- for blockid in blockids:
- start = blocksize * blockid
- if start < file_size and block_hash == self._hash_from_file(
- local_file, start, blocksize, blockhash):
- self._cb_next()
- continue
+ blockids = [blk * blocksize for blk in blockids]
+ unsaved = [blk for blk in blockids if not (
+ blk < file_size and block_hash == self._hash_from_file(
+ local_file, blk, blocksize, blockhash))]
+ self._cb_next(len(blockids) - len(unsaved))
+ if unsaved:
+ key = unsaved[0]
self._watch_thread_limit(flying.values())
- finished += self._thread2file(
- flying,
- local_file,
- offset,
+ self._thread2file(
+ flying, blockid_dict, local_file, offset,
**restargs)
- end = total_size - 1 if start + blocksize > total_size\
- else start + blocksize - 1
- (start, end) = _range_up(start, end, filerange)
+ end = total_size - 1 if key + blocksize > total_size\
+ else key + blocksize - 1
+ start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
continue
restargs['async_headers'] = {
'Range': 'bytes=%s-%s' % (start, end)}
- flying[start] = self._get_block_async(obj, **restargs)
+ flying[key] = self._get_block_async(obj, **restargs)
+ blockid_dict[key] = unsaved
for thread in flying.values():
thread.join()
- finished += self._thread2file(flying, local_file, offset, **restargs)
+ self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
def download_object(
self, obj, dst,
self._complete_cb()
#Command Progress Bar method
- def _cb_next(self):
+ def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
try:
- self.progress_bar_gen.next()
+ self.progress_bar_gen.next(step)
except:
pass
def _complete_cb(self):
while True:
try:
- self.progress_bar_gen.next()
+ self.progress_bar_gen.next(step)
except:
break