Revision 28cbc3c2

b/kamaki/clients/pithos/__init__.py
442 442
        h.update(block.strip('\x00'))
443 443
        return hexlify(h.digest())
444 444

  
445
    def _thread2file(self, flying, local_file, offset=0, **restargs):
445
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
446 446
        """write the results of a greenleted rest call to a file
447 447

  
448 448
        :param offset: the offset of the file up to blocksize
449 449
        - e.g. if the range is 10-100, all blocks will be written to
450 450
        normal_position - 10
451 451
        """
452
        finished = []
453
        for i, (start, g) in enumerate(flying.items()):
454
            if not g.isAlive():
455
                if g.exception:
456
                    raise g.exception
457
                block = g.value.content
458
                local_file.seek(start - offset)
452
        for i, (key, g) in enumerate(flying.items()):
453
            if g.isAlive():
454
                continue
455
            if g.exception:
456
                raise g.exception
457
            block = g.value.content
458
            for block_start in blockids[key]:
459
                local_file.seek(block_start + offset)
459 460
                local_file.write(block)
460 461
                self._cb_next()
461
                finished.append(flying.pop(start))
462
            flying.pop(key)
463
            blockids.pop(key)
462 464
        local_file.flush()
463
        return finished
464 465

  
465 466
    def _dump_blocks_async(
466 467
            self, obj, remote_hashes, blocksize, total_size, local_file,
467 468
            blockhash=None, resume=False, filerange=None, **restargs):
468 469
        file_size = fstat(local_file.fileno()).st_size if resume else 0
469
        flying = {}
470
        finished = []
470
        flying = dict()
471
        blockid_dict = dict()
471 472
        offset = 0
472 473
        if filerange is not None:
473 474
            rstart = int(filerange.split('-')[0])
......
475 476

  
476 477
        self._init_thread_limit()
477 478
        for block_hash, blockids in remote_hashes.items():
478
            for blockid in blockids:
479
                start = blocksize * blockid
480
                if start < file_size and block_hash == self._hash_from_file(
481
                        local_file, start, blocksize, blockhash):
482
                    self._cb_next()
483
                    continue
479
            blockids = [blk * blocksize for blk in blockids]
480
            unsaved = [blk for blk in blockids if not (
481
                blk < file_size and block_hash == self._hash_from_file(
482
                        local_file, blk, blocksize, blockhash))]
483
            self._cb_next(len(blockids) - len(unsaved))
484
            if unsaved:
485
                key = unsaved[0]
484 486
                self._watch_thread_limit(flying.values())
485
                finished += self._thread2file(
486
                    flying,
487
                    local_file,
488
                    offset,
487
                self._thread2file(
488
                    flying, blockid_dict, local_file, offset,
489 489
                    **restargs)
490
                end = total_size - 1 if start + blocksize > total_size\
491
                    else start + blocksize - 1
492
                (start, end) = _range_up(start, end, filerange)
490
                end = total_size - 1 if key + blocksize > total_size\
491
                    else key + blocksize - 1
492
                start, end = _range_up(key, end, filerange)
493 493
                if start == end:
494 494
                    self._cb_next()
495 495
                    continue
496 496
                restargs['async_headers'] = {
497 497
                    'Range': 'bytes=%s-%s' % (start, end)}
498
                flying[start] = self._get_block_async(obj, **restargs)
498
                flying[key] = self._get_block_async(obj, **restargs)
499
                blockid_dict[key] = unsaved
499 500

  
500 501
        for thread in flying.values():
501 502
            thread.join()
502
        finished += self._thread2file(flying, local_file, offset, **restargs)
503
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
503 504

  
504 505
    def download_object(
505 506
            self, obj, dst,
......
578 579
        self._complete_cb()
579 580

  
580 581
    #Command Progress Bar method
581
    def _cb_next(self):
582
    def _cb_next(self, step=1):
582 583
        if hasattr(self, 'progress_bar_gen'):
583 584
            try:
584
                self.progress_bar_gen.next()
585
                self.progress_bar_gen.next(step)
585 586
            except:
586 587
                pass
587 588

  
588 589
    def _complete_cb(self):
589 590
        while True:
590 591
            try:
591
                self.progress_bar_gen.next()
592
                self.progress_bar_gen.next(step)
592 593
            except:
593 594
                break
594 595

  

Also available in: Unified diff