Revision 28cbc3c2
b/kamaki/clients/pithos/__init__.py | ||
---|---|---|
442 | 442 |
h.update(block.strip('\x00')) |
443 | 443 |
return hexlify(h.digest()) |
444 | 444 |
|
445 |
def _thread2file(self, flying, local_file, offset=0, **restargs): |
|
445 |
def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
|
|
446 | 446 |
"""write the results of a greenleted rest call to a file |
447 | 447 |
|
448 | 448 |
:param offset: the offset of the file up to blocksize |
449 | 449 |
- e.g. if the range is 10-100, all blocks will be written to |
450 | 450 |
normal_position - 10 |
451 | 451 |
""" |
452 |
finished = [] |
|
453 |
for i, (start, g) in enumerate(flying.items()): |
|
454 |
if not g.isAlive(): |
|
455 |
if g.exception: |
|
456 |
raise g.exception |
|
457 |
block = g.value.content |
|
458 |
local_file.seek(start - offset) |
|
452 |
for i, (key, g) in enumerate(flying.items()): |
|
453 |
if g.isAlive(): |
|
454 |
continue |
|
455 |
if g.exception: |
|
456 |
raise g.exception |
|
457 |
block = g.value.content |
|
458 |
for block_start in blockids[key]: |
|
459 |
local_file.seek(block_start + offset) |
|
459 | 460 |
local_file.write(block) |
460 | 461 |
self._cb_next() |
461 |
finished.append(flying.pop(start)) |
|
462 |
flying.pop(key) |
|
463 |
blockids.pop(key) |
|
462 | 464 |
local_file.flush() |
463 |
return finished |
|
464 | 465 |
|
465 | 466 |
def _dump_blocks_async( |
466 | 467 |
self, obj, remote_hashes, blocksize, total_size, local_file, |
467 | 468 |
blockhash=None, resume=False, filerange=None, **restargs): |
468 | 469 |
file_size = fstat(local_file.fileno()).st_size if resume else 0 |
469 |
flying = {}
|
|
470 |
finished = []
|
|
470 |
flying = dict()
|
|
471 |
blockid_dict = dict()
|
|
471 | 472 |
offset = 0 |
472 | 473 |
if filerange is not None: |
473 | 474 |
rstart = int(filerange.split('-')[0]) |
... | ... | |
475 | 476 |
|
476 | 477 |
self._init_thread_limit() |
477 | 478 |
for block_hash, blockids in remote_hashes.items(): |
478 |
for blockid in blockids: |
|
479 |
start = blocksize * blockid |
|
480 |
if start < file_size and block_hash == self._hash_from_file( |
|
481 |
local_file, start, blocksize, blockhash): |
|
482 |
self._cb_next() |
|
483 |
continue |
|
479 |
blockids = [blk * blocksize for blk in blockids] |
|
480 |
unsaved = [blk for blk in blockids if not ( |
|
481 |
blk < file_size and block_hash == self._hash_from_file( |
|
482 |
local_file, blk, blocksize, blockhash))] |
|
483 |
self._cb_next(len(blockids) - len(unsaved)) |
|
484 |
if unsaved: |
|
485 |
key = unsaved[0] |
|
484 | 486 |
self._watch_thread_limit(flying.values()) |
485 |
finished += self._thread2file( |
|
486 |
flying, |
|
487 |
local_file, |
|
488 |
offset, |
|
487 |
self._thread2file( |
|
488 |
flying, blockid_dict, local_file, offset, |
|
489 | 489 |
**restargs) |
490 |
end = total_size - 1 if start + blocksize > total_size\
|
|
491 |
else start + blocksize - 1
|
|
492 |
(start, end) = _range_up(start, end, filerange)
|
|
490 |
end = total_size - 1 if key + blocksize > total_size\
|
|
491 |
else key + blocksize - 1
|
|
492 |
start, end = _range_up(key, end, filerange)
|
|
493 | 493 |
if start == end: |
494 | 494 |
self._cb_next() |
495 | 495 |
continue |
496 | 496 |
restargs['async_headers'] = { |
497 | 497 |
'Range': 'bytes=%s-%s' % (start, end)} |
498 |
flying[start] = self._get_block_async(obj, **restargs) |
|
498 |
flying[key] = self._get_block_async(obj, **restargs) |
|
499 |
blockid_dict[key] = unsaved |
|
499 | 500 |
|
500 | 501 |
for thread in flying.values(): |
501 | 502 |
thread.join() |
502 |
finished += self._thread2file(flying, local_file, offset, **restargs)
|
|
503 |
self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
|
|
503 | 504 |
|
504 | 505 |
def download_object( |
505 | 506 |
self, obj, dst, |
... | ... | |
578 | 579 |
self._complete_cb() |
579 | 580 |
|
580 | 581 |
#Command Progress Bar method |
581 |
def _cb_next(self): |
|
582 |
def _cb_next(self, step=1):
|
|
582 | 583 |
if hasattr(self, 'progress_bar_gen'): |
583 | 584 |
try: |
584 |
self.progress_bar_gen.next() |
|
585 |
self.progress_bar_gen.next(step)
|
|
585 | 586 |
except: |
586 | 587 |
pass |
587 | 588 |
|
588 | 589 |
def _complete_cb(self): |
589 | 590 |
while True: |
590 | 591 |
try: |
591 |
self.progress_bar_gen.next() |
|
592 |
self.progress_bar_gen.next(step)
|
|
592 | 593 |
except: |
593 | 594 |
break |
594 | 595 |
|
Also available in: Unified diff