Revision cad39033
b/kamaki/cli/command_shell.py | ||
---|---|---|
133 | 133 |
self._unregister_method('complete_%s' % subname) |
134 | 134 |
self._unregister_method('help_%s' % subname) |
135 | 135 |
|
136 |
|
|
137 | 136 |
@classmethod |
138 | 137 |
def _backup(self): |
139 | 138 |
return dict(self.__dict__) |
b/kamaki/clients/__init__.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
from json import dumps, loads |
35 |
from time import time |
|
35 | 36 |
import logging |
36 | 37 |
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection |
37 | 38 |
|
... | ... | |
68 | 69 |
|
69 | 70 |
|
70 | 71 |
class Client(object): |
72 |
POOL_SIZE = 5 |
|
73 |
|
|
71 | 74 |
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()): |
72 | 75 |
self.base_url = base_url |
73 | 76 |
self.token = token |
... | ... | |
77 | 80 |
"%a, %d %b %Y %H:%M:%S GMT"] |
78 | 81 |
self.http_client = http_client |
79 | 82 |
|
83 |
def _init_thread_limit(self, limit=1): |
|
84 |
self._thread_limit = limit |
|
85 |
self._elapsed_old = 0.0 |
|
86 |
self._elapsed_new = 0.0 |
|
87 |
|
|
88 |
def _watch_thread_limit(self, threadlist): |
|
89 |
if self._elapsed_old > self._elapsed_new\ |
|
90 |
and self._thread_limit < self.POOL_SIZE: |
|
91 |
self._thread_limit += 1 |
|
92 |
elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1: |
|
93 |
self._thread_limit -= 1 |
|
94 |
|
|
95 |
self._elapsed_old = self._elapsed_new |
|
96 |
if len(threadlist) >= self._thread_limit: |
|
97 |
self._elapsed_new = 0.0 |
|
98 |
for thread in threadlist: |
|
99 |
begin_time = time() |
|
100 |
thread.join() |
|
101 |
self._elapsed_new += time() - begin_time |
|
102 |
self._elapsed_new = self._elapsed_new / len(threadlist) |
|
103 |
return [] |
|
104 |
return threadlist |
|
105 |
|
|
80 | 106 |
def _raise_for_status(self, r): |
81 | 107 |
status_msg = getattr(r, 'status', '') |
82 | 108 |
try: |
b/kamaki/clients/pithos.py | ||
---|---|---|
98 | 98 |
|
99 | 99 |
def __init__(self, base_url, token, account=None, container=None): |
100 | 100 |
super(PithosClient, self).__init__(base_url, token, account, container) |
101 |
self.POOL_SIZE = 5 |
|
102 | 101 |
|
103 | 102 |
def purge_container(self): |
104 | 103 |
r = self.container_delete(until=unicode(time())) |
... | ... | |
239 | 238 |
upload_gen = upload_cb(len(missing)) |
240 | 239 |
upload_gen.next() |
241 | 240 |
|
242 |
thread_limit = 1 |
|
243 |
elapsed_old = 0.0 |
|
244 |
elapsed_new = 0.0 |
|
241 |
self._init_thread_limit() |
|
245 | 242 |
|
246 | 243 |
flying = [] |
247 | 244 |
for hash in missing: |
... | ... | |
253 | 250 |
unfinished = [] |
254 | 251 |
for i, thread in enumerate(flying): |
255 | 252 |
|
256 |
if elapsed_old: |
|
257 |
if elapsed_old >= elapsed_new\ |
|
258 |
and thread_limit < self.POOL_SIZE: |
|
259 |
thread_limit += 1 |
|
260 |
elif elapsed_old < elapsed_new and thread_limit > 1: |
|
261 |
thread_limit -= 1 |
|
262 |
|
|
263 |
if len(unfinished) >= thread_limit: |
|
264 |
elapsed_old = elapsed_new |
|
265 |
elapsed_new = 0.0 |
|
266 |
for unf in unfinished: |
|
267 |
begin_time = time() |
|
268 |
unf.join() |
|
269 |
elapsed_new += time() - begin_time |
|
270 |
elapsed_new = elapsed_new / len(unfinished) |
|
271 |
unfinished = [] |
|
253 |
unfinished = self._watch_thread_limit(unfinished) |
|
272 | 254 |
|
273 | 255 |
if thread.isAlive() or thread.exception: |
274 | 256 |
unfinished.append(thread) |
... | ... | |
410 | 392 |
blocks will be written to normal_position - 10""" |
411 | 393 |
finished = [] |
412 | 394 |
for i, (start, g) in enumerate(flying.items()): |
413 |
if i % self.POOL_SIZE == 0: |
|
414 |
g.join(0.1) |
|
395 |
#if i % self.POOL_SIZE == 0:
|
|
396 |
# g.join(0.1)
|
|
415 | 397 |
if not g.isAlive(): |
416 | 398 |
if g.exception: |
417 | 399 |
raise g.exception |
... | ... | |
442 | 424 |
rstart = int(filerange.split('-')[0]) |
443 | 425 |
offset = rstart if blocksize > rstart else rstart % blocksize |
444 | 426 |
|
427 |
self._init_thread_limit() |
|
445 | 428 |
for block_hash, blockid in remote_hashes.items(): |
446 | 429 |
start = blocksize * blockid |
447 | 430 |
if start < file_size\ |
... | ... | |
452 | 435 |
blockhash): |
453 | 436 |
self._cb_next() |
454 | 437 |
continue |
438 |
self._watch_thread_limit(flying.values()) |
|
455 | 439 |
finished += self._thread2file( |
456 | 440 |
flying, |
457 | 441 |
local_file, |
Also available in: Unified diff