Revision 9a7efb0d kamaki/clients/pithos.py

b/kamaki/clients/pithos.py
244 244
        file_hashmap = HashMap(blocksize, blockhash)
245 245
        file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
246 246

  
247
        #filter out blocks that are already downloaded
248 247
        for i, x in enumerate(file_hashmap):
249 248
            local_hash = hexlify(x)
250 249
            if local_hash in remote_hashes:
......
263 262
                    gevent.Greenlet._report_error(self, exc_info)
264 263
                finally:
265 264
                    sys.stderr = _stderr
266
        self.POOL_SIZE = 5
265
        if not hasattr(self, 'POOL_SIZE'):
266
            self.POOL_SIZE = 5
267 267
        if self.async_pool is None:
268 268
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
269 269
        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
......
273 273
    def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
274 274
        finished = []
275 275
        for start, g in flying_greenlets.items():
276
            print('\tIs g ID(%s) ready? %s'%(self.mmaapp[start], g.ready()))
276 277
            if g.ready():
277 278
                if g.exception:
278
                    g.release()
279 279
                    raise g.exception
280 280
                try:
281 281
                    block = g.value.content
282 282
                except AttributeError:
283 283
                    broken[start] = flying_greenlets.pop(start)
284
                    #g.spawn()
284 285
                    continue
285 286
                local_file.seek(start)
287
                print('\tID(%s) [%s...]\n\tg.value:%s\n\tg:%s\n'%(self.mmaapp[start], block[1:10],
288
                    g.value, g))
289
                print('\tID(%s): g.value.request: %s\n---'%(self.mmaapp[start], g.value.request))
286 290
                local_file.write(block)
287 291
                #local_file.flush()
288 292
                self._cb_next()
......
291 295
        return finished
292 296

  
293 297
    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
294

  
295
        #let the fly
296 298
        flying_greenlets = {}
297 299
        finished_greenlets = []
298 300
        broken = {}
301
        self.mmaapp = {}
299 302
        for block_hash, blockid in remote_hashes.items():
303
            if len(flying_greenlets) >= self.POOL_SIZE:
304
                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
305
                    **restargs)
300 306
            start = blocksize*blockid
307
            self.mmaapp[start] = blockid
301 308
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
302
            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
303
            #store info for relaunching greenlet if needed
309
            restargs['async_headers'] = dict(data_range='bytes=%s-%s'%(start, end))
310
            print('ID(%s) get_grnlt {'%blockid)
304 311
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
305
            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
306
                **restargs)
312
            print('ID(%s) got_grnlt }'%blockid)
307 313

  
308 314
        #check the greenlets
309 315
        while len(flying_greenlets) > 0:
......
333 339
            hash_list, 
334 340
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
335 341
        assert total_size >= 0
342
        self.POOL_SIZE = 5
336 343

  
337 344
        if download_cb:
338 345
            self.progress_bar_gen = download_cb(len(remote_hashes)+1)

Also available in: Unified diff