Revision 6fa30b1b kamaki/clients/pithos/__init__.py
b/kamaki/clients/pithos/__init__.py | ||
---|---|---|
417 | 417 |
else: |
418 | 418 |
break |
419 | 419 |
if missing: |
420 |
raise ClientError('%s blocks failed to upload' % len(missing)) |
|
420 |
raise ClientError( |
|
421 |
'%s blocks failed to upload' % len(missing), |
|
422 |
details=['%s' % thread.exception for thread in missing]) |
|
421 | 423 |
except KeyboardInterrupt: |
422 | 424 |
sendlog.info('- - - wait for threads to finish') |
423 | 425 |
for thread in activethreads(): |
... | ... | |
438 | 440 |
success=201) |
439 | 441 |
return r.headers |
440 | 442 |
|
441 |
|
|
442 | 443 |
def upload_from_string( |
443 | 444 |
self, obj, input_str, |
444 | 445 |
hash_cb=None, |
... | ... | |
497 | 498 |
num_of_blocks, blockmod = size / blocksize, size % blocksize |
498 | 499 |
num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod |
499 | 500 |
|
500 |
hashes = {}
|
|
501 |
hashes = []
|
|
501 | 502 |
hmap = {} |
502 | 503 |
for blockid in range(num_of_blocks): |
503 | 504 |
start = blockid * blocksize |
504 | 505 |
block = input_str[start: (start + blocksize)] |
505 |
hashes[blockid] = _pithos_hash(block, blockhash)
|
|
506 |
hashes.append(_pithos_hash(block, blockhash))
|
|
506 | 507 |
hmap[hashes[blockid]] = (start, block) |
507 | 508 |
|
508 | 509 |
hashmap = dict(bytes=size, hashes=hashes) |
... | ... | |
525 | 526 |
for i in range(num_of_blocks + 1 - num_of_missing): |
526 | 527 |
self._cb_next() |
527 | 528 |
|
529 |
tries = 7 |
|
530 |
old_failures = 0 |
|
528 | 531 |
try: |
529 |
flying = [] |
|
530 |
for hash in missing: |
|
531 |
offset, block = hmap[hash] |
|
532 |
bird = self._put_block_async(block, hash) |
|
533 |
flying.append(bird) |
|
534 |
unfinished = self._watch_thread_limit(flying) |
|
535 |
for thread in set(flying).difference(unfinished): |
|
532 |
while tries and missing: |
|
533 |
flying = [] |
|
534 |
failures = [] |
|
535 |
for hash in missing: |
|
536 |
offset, block = hmap[hash] |
|
537 |
bird = self._put_block_async(block, hash) |
|
538 |
flying.append(bird) |
|
539 |
unfinished = self._watch_thread_limit(flying) |
|
540 |
for thread in set(flying).difference(unfinished): |
|
541 |
if thread.exception: |
|
542 |
failures.append(thread.kwargs['hash']) |
|
543 |
if thread.isAlive(): |
|
544 |
flying.append(thread) |
|
545 |
else: |
|
546 |
self._cb_next() |
|
547 |
flying = unfinished |
|
548 |
for thread in flying: |
|
549 |
thread.join() |
|
536 | 550 |
if thread.exception: |
537 |
raise thread.exception |
|
538 |
if thread.isAlive(): |
|
539 |
flying.append(thread) |
|
540 |
else: |
|
541 |
self._cb_next() |
|
542 |
flying = unfinished |
|
543 |
for thread in flying: |
|
544 |
thread.join() |
|
545 |
if thread.exception: |
|
546 |
raise thread.exception |
|
547 |
self._cb_next() |
|
548 |
|
|
551 |
failures.append(thread.kwargs['hash']) |
|
552 |
self._cb_next() |
|
553 |
missing = failures |
|
554 |
if missing and len(missing) == old_failures: |
|
555 |
tries -= 1 |
|
556 |
old_failures = len(missing) |
|
557 |
if missing: |
|
558 |
raise ClientError( |
|
559 |
'%s blocks failed to upload' % len(missing), |
|
560 |
details=['%s' % thread.exception for thread in missing]) |
|
549 | 561 |
except KeyboardInterrupt: |
550 | 562 |
sendlog.info('- - - wait for threads to finish') |
551 | 563 |
for thread in activethreads(): |
Also available in: Unified diff