Revision cad39033

b/kamaki/cli/command_shell.py
133 133
            self._unregister_method('complete_%s' % subname)
134 134
            self._unregister_method('help_%s' % subname)
135 135

  
136

  
137 136
    @classmethod
138 137
    def _backup(self):
139 138
        return dict(self.__dict__)
b/kamaki/clients/__init__.py
32 32
# or implied, of GRNET S.A.
33 33

  
34 34
from json import dumps, loads
35
from time import time
35 36
import logging
36 37
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
37 38

  
......
68 69

  
69 70

  
70 71
class Client(object):
72
    POOL_SIZE = 5
73

  
71 74
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
72 75
        self.base_url = base_url
73 76
        self.token = token
......
77 80
            "%a, %d %b %Y %H:%M:%S GMT"]
78 81
        self.http_client = http_client
79 82

  
83
    def _init_thread_limit(self, limit=1):
84
        self._thread_limit = limit
85
        self._elapsed_old = 0.0
86
        self._elapsed_new = 0.0
87

  
88
    def _watch_thread_limit(self, threadlist):
89
        if self._elapsed_old > self._elapsed_new\
90
        and self._thread_limit < self.POOL_SIZE:
91
            self._thread_limit += 1
92
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
93
            self._thread_limit -= 1
94

  
95
        self._elapsed_old = self._elapsed_new
96
        if len(threadlist) >= self._thread_limit:
97
            self._elapsed_new = 0.0
98
            for thread in threadlist:
99
                begin_time = time()
100
                thread.join()
101
                self._elapsed_new += time() - begin_time
102
            self._elapsed_new = self._elapsed_new / len(threadlist)
103
            return []
104
        return threadlist
105

  
80 106
    def _raise_for_status(self, r):
81 107
        status_msg = getattr(r, 'status', '')
82 108
        try:
b/kamaki/clients/pithos.py
98 98

  
99 99
    def __init__(self, base_url, token, account=None, container=None):
100 100
        super(PithosClient, self).__init__(base_url, token, account, container)
101
        self.POOL_SIZE = 5
102 101

  
103 102
    def purge_container(self):
104 103
        r = self.container_delete(until=unicode(time()))
......
239 238
            upload_gen = upload_cb(len(missing))
240 239
            upload_gen.next()
241 240

  
242
        thread_limit = 1
243
        elapsed_old = 0.0
244
        elapsed_new = 0.0
241
        self._init_thread_limit()
245 242

  
246 243
        flying = []
247 244
        for hash in missing:
......
253 250
            unfinished = []
254 251
            for i, thread in enumerate(flying):
255 252

  
256
                if elapsed_old:
257
                    if elapsed_old >= elapsed_new\
258
                    and thread_limit < self.POOL_SIZE:
259
                        thread_limit += 1
260
                    elif elapsed_old < elapsed_new and thread_limit > 1:
261
                        thread_limit -= 1
262

  
263
                if len(unfinished) >= thread_limit:
264
                    elapsed_old = elapsed_new
265
                    elapsed_new = 0.0
266
                    for unf in unfinished:
267
                        begin_time = time()
268
                        unf.join()
269
                        elapsed_new += time() - begin_time
270
                    elapsed_new = elapsed_new / len(unfinished)
271
                    unfinished = []
253
                unfinished = self._watch_thread_limit(unfinished)
272 254

  
273 255
                if thread.isAlive() or thread.exception:
274 256
                    unfinished.append(thread)
......
410 392
        blocks will be written to normal_position - 10"""
411 393
        finished = []
412 394
        for i, (start, g) in enumerate(flying.items()):
413
            if i % self.POOL_SIZE == 0:
414
                g.join(0.1)
395
            #if i % self.POOL_SIZE == 0:
396
            #    g.join(0.1)
415 397
            if not g.isAlive():
416 398
                if g.exception:
417 399
                    raise g.exception
......
442 424
            rstart = int(filerange.split('-')[0])
443 425
            offset = rstart if blocksize > rstart else rstart % blocksize
444 426

  
427
        self._init_thread_limit()
445 428
        for block_hash, blockid in remote_hashes.items():
446 429
            start = blocksize * blockid
447 430
            if start < file_size\
......
452 435
                    blockhash):
453 436
                self._cb_next()
454 437
                continue
438
            self._watch_thread_limit(flying.values())
455 439
            finished += self._thread2file(
456 440
                flying,
457 441
                local_file,

Also available in: Unified diff