- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
- for i, (key, g) in enumerate(flying.items()):
+ for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
- end = total_size - 1 if key + blocksize > total_size\
- else key + blocksize - 1
+ end = total_size - 1 if (
+ key + blocksize > total_size) else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
if_none_match=None,
if_modified_since=None,
if_unmodified_since=None):
- """Download an object to a string (multiple connections)
+ """Download an object to a string (multiple connections). This method
+ uses threads for http requests, but stores all content in memory.
:param obj: (str) remote object path
self.progress_bar_gen = download_cb(len(hash_list))
self._cb_next()
- ret = ''
+ num_of_blocks = len(remote_hashes)
+ ret = [''] * num_of_blocks
+ self._init_thread_limit()
+ flying = dict()
for blockid, blockhash in enumerate(remote_hashes):
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
(start, end) = _range_up(start, end, range_str)
- if start == end:
- continue
- restargs['data_range'] = 'bytes=%s-%s' % (start, end)
- r = self.object_get(obj, success=(200, 206), **restargs)
- ret += r.content
- self._cb_next()
-
- self._complete_cb()
- return ret
+ if start < end:
+ self._watch_thread_limit(flying.values())
+ flying[blockid] = self._get_block_async(obj, **restargs)
+ for runid, thread in flying.items():
+ if (blockid + 1) == num_of_blocks:
+ thread.join()
+ elif thread.isAlive():
+ continue
+ if thread.exception:
+ raise thread.exception
+ ret[runid] = thread.value.content
+ self._cb_next()
+ flying.pop(runid)
+ return ''.join(ret)
#Command Progress Bar method
def _cb_next(self, step=1):