Revision cad39033 kamaki/clients/pithos.py
b/kamaki/clients/pithos.py | ||
---|---|---|
98 | 98 |
|
99 | 99 |
def __init__(self, base_url, token, account=None, container=None): |
100 | 100 |
super(PithosClient, self).__init__(base_url, token, account, container) |
101 |
self.POOL_SIZE = 5 |
|
102 | 101 |
|
103 | 102 |
def purge_container(self): |
104 | 103 |
r = self.container_delete(until=unicode(time())) |
... | ... | |
239 | 238 |
upload_gen = upload_cb(len(missing)) |
240 | 239 |
upload_gen.next() |
241 | 240 |
|
242 |
thread_limit = 1 |
|
243 |
elapsed_old = 0.0 |
|
244 |
elapsed_new = 0.0 |
|
241 |
self._init_thread_limit() |
|
245 | 242 |
|
246 | 243 |
flying = [] |
247 | 244 |
for hash in missing: |
... | ... | |
253 | 250 |
unfinished = [] |
254 | 251 |
for i, thread in enumerate(flying): |
255 | 252 |
|
256 |
if elapsed_old: |
|
257 |
if elapsed_old >= elapsed_new\ |
|
258 |
and thread_limit < self.POOL_SIZE: |
|
259 |
thread_limit += 1 |
|
260 |
elif elapsed_old < elapsed_new and thread_limit > 1: |
|
261 |
thread_limit -= 1 |
|
262 |
|
|
263 |
if len(unfinished) >= thread_limit: |
|
264 |
elapsed_old = elapsed_new |
|
265 |
elapsed_new = 0.0 |
|
266 |
for unf in unfinished: |
|
267 |
begin_time = time() |
|
268 |
unf.join() |
|
269 |
elapsed_new += time() - begin_time |
|
270 |
elapsed_new = elapsed_new / len(unfinished) |
|
271 |
unfinished = [] |
|
253 |
unfinished = self._watch_thread_limit(unfinished) |
|
272 | 254 |
|
273 | 255 |
if thread.isAlive() or thread.exception: |
274 | 256 |
unfinished.append(thread) |
... | ... | |
410 | 392 |
blocks will be written to normal_position - 10""" |
411 | 393 |
finished = [] |
412 | 394 |
for i, (start, g) in enumerate(flying.items()): |
413 |
if i % self.POOL_SIZE == 0: |
|
414 |
g.join(0.1) |
|
395 |
#if i % self.POOL_SIZE == 0:
|
|
396 |
# g.join(0.1)
|
|
415 | 397 |
if not g.isAlive(): |
416 | 398 |
if g.exception: |
417 | 399 |
raise g.exception |
... | ... | |
442 | 424 |
rstart = int(filerange.split('-')[0]) |
443 | 425 |
offset = rstart if blocksize > rstart else rstart % blocksize |
444 | 426 |
|
427 |
self._init_thread_limit() |
|
445 | 428 |
for block_hash, blockid in remote_hashes.items(): |
446 | 429 |
start = blocksize * blockid |
447 | 430 |
if start < file_size\ |
... | ... | |
452 | 435 |
blockhash): |
453 | 436 |
self._cb_next() |
454 | 437 |
continue |
438 |
self._watch_thread_limit(flying.values()) |
|
455 | 439 |
finished += self._thread2file( |
456 | 440 |
flying, |
457 | 441 |
local_file, |
Also available in: Unified diff