Revision 0fbc8a52
b/kamaki/clients/pithos/__init__.py | ||
---|---|---|
494 | 494 |
- e.g. if the range is 10-100, all blocks will be written to |
495 | 495 |
normal_position - 10 |
496 | 496 |
""" |
497 |
for i, (key, g) in enumerate(flying.items()):
|
|
497 |
for key, g in flying.items():
|
|
498 | 498 |
if g.isAlive(): |
499 | 499 |
continue |
500 | 500 |
if g.exception: |
... | ... | |
532 | 532 |
self._thread2file( |
533 | 533 |
flying, blockid_dict, local_file, offset, |
534 | 534 |
**restargs) |
535 |
end = total_size - 1 if key + blocksize > total_size\
|
|
536 |
else key + blocksize - 1 |
|
535 |
end = total_size - 1 if (
|
|
536 |
key + blocksize > total_size) else key + blocksize - 1
|
|
537 | 537 |
start, end = _range_up(key, end, filerange) |
538 | 538 |
if start == end: |
539 | 539 |
self._cb_next() |
... | ... | |
632 | 632 |
if_none_match=None, |
633 | 633 |
if_modified_since=None, |
634 | 634 |
if_unmodified_since=None): |
635 |
"""Download an object to a string (multiple connections) |
|
635 |
"""Download an object to a string (multiple connections). This method |
|
636 |
uses threads for http requests, but stores all content in memory. |
|
636 | 637 |
|
637 | 638 |
:param obj: (str) remote object path |
638 | 639 |
|
... | ... | |
672 | 673 |
self.progress_bar_gen = download_cb(len(hash_list)) |
673 | 674 |
self._cb_next() |
674 | 675 |
|
675 |
ret = '' |
|
676 |
num_of_blocks = len(remote_hashes) |
|
677 |
ret = [''] * num_of_blocks |
|
678 |
self._init_thread_limit() |
|
679 |
flying = dict() |
|
676 | 680 |
for blockid, blockhash in enumerate(remote_hashes): |
677 | 681 |
start = blocksize * blockid |
678 | 682 |
is_last = start + blocksize > total_size |
679 | 683 |
end = (total_size - 1) if is_last else (start + blocksize - 1) |
680 | 684 |
(start, end) = _range_up(start, end, range_str) |
681 |
if start == end: |
|
682 |
continue |
|
683 |
restargs['data_range'] = 'bytes=%s-%s' % (start, end) |
|
684 |
r = self.object_get(obj, success=(200, 206), **restargs) |
|
685 |
ret += r.content |
|
686 |
self._cb_next() |
|
687 |
|
|
688 |
self._complete_cb() |
|
689 |
return ret |
|
685 |
if start < end: |
|
686 |
self._watch_thread_limit(flying.values()) |
|
687 |
flying[blockid] = self._get_block_async(obj, **restargs) |
|
688 |
for runid, thread in flying.items(): |
|
689 |
if (blockid + 1) == num_of_blocks: |
|
690 |
thread.join() |
|
691 |
elif thread.isAlive(): |
|
692 |
continue |
|
693 |
if thread.exception: |
|
694 |
raise thread.exception |
|
695 |
ret[runid] = thread.value.content |
|
696 |
self._cb_next() |
|
697 |
flying.pop(runid) |
|
698 |
return ''.join(ret) |
|
690 | 699 |
|
691 | 700 |
#Command Progress Bar method |
692 | 701 |
def _cb_next(self, step=1): |
Also available in: Unified diff