Revision fb0cd49a
b/kamaki/clients/pithos.py | ||
---|---|---|
252 | 252 |
#resume if some blocks have been downloaded |
253 | 253 |
resumed[existing_hash] = i |
254 | 254 |
if with_progress_bar: |
255 |
download_gen.next() |
|
255 |
try: |
|
256 |
download_gen.next() |
|
257 |
except: |
|
258 |
pass |
|
256 | 259 |
elif not overide: |
257 | 260 |
raise ClientError(message='Local file is substantialy different', |
258 | 261 |
status=600) |
... | ... | |
270 | 273 |
end = custom_end |
271 | 274 |
return (start, end) |
272 | 275 |
|
273 |
def _manage_finished_downloading_greenlets(self, flying, objfile, sleeptime=0):
|
|
276 |
def _manage_downloading_greenlets(self, flying, objfile, broken_greenlets = [], sleeptime=0):
|
|
274 | 277 |
newflying = [] |
275 | 278 |
for v in flying: |
276 | 279 |
h = v['handler'] |
... | ... | |
278 | 281 |
if h.exception: |
279 | 282 |
h.release() |
280 | 283 |
raise h.exception |
284 |
try: |
|
285 |
block = h.value.content |
|
286 |
except AttributeError: |
|
287 |
#catch greenlets that break due to an eventlist bug |
|
288 |
print('- - - - - > Got a borken greenlet here') |
|
289 |
broken_greenlets.append(v) |
|
290 |
continue |
|
281 | 291 |
objfile.seek(v['start']) |
282 |
objfile.write(h.value.content)
|
|
292 |
objfile.write(block)
|
|
283 | 293 |
objfile.flush() |
284 | 294 |
else: |
285 | 295 |
#if there are unfinished greenlets, sleep for some time - be carefull with that |
... | ... | |
299 | 309 |
gevent.Greenlet._report_error(self, exc_info) |
300 | 310 |
finally: |
301 | 311 |
sys.stderr = _stderr |
302 |
POOL_SIZE =5
|
|
312 |
POOL_SIZE =7
|
|
303 | 313 |
if self.async_pool is None: |
304 | 314 |
self.async_pool = gevent.pool.Pool(size=POOL_SIZE) |
305 | 315 |
g = SilentGreenlet(self._get_block, obj, **kwargs) |
306 | 316 |
self.async_pool.start(g) |
307 | 317 |
return g |
318 |
|
|
308 | 319 |
def _async_download_missing_blocks(self, obj, objfile, hmap, resumed, blocksize, total_size, |
309 | 320 |
download_gen=None, custom_start = None, custom_end=None, **restargs): |
310 | 321 |
"""Attempt pseudo-multithreaded (with greenlets) download of blocks, or if that |
... | ... | |
326 | 337 |
data_range = 'bytes=%s-%s'%(start, end) |
327 | 338 |
handler = self._get_block_async(obj, data_range=data_range, **restargs) |
328 | 339 |
flying.append({'handler':handler, 'start':start, 'data_range':data_range}) |
329 |
flying = self._manage_finished_downloading_greenlets(flying, objfile) |
|
340 |
broken = [] |
|
341 |
flying = self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken) |
|
342 |
#workaround for eventlib bug that breaks greenlets: replace them with new ones |
|
343 |
for brgr in broken: |
|
344 |
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'], |
|
345 |
**restargs) |
|
346 |
flying.append(brgr) |
|
330 | 347 |
|
331 | 348 |
#write the last results and exit |
332 | 349 |
while len(flying) > 0: |
333 |
flying=self._manage_finished_downloading_greenlets(flying, objfile, sleeptime=0.1) |
|
350 |
broken = [] |
|
351 |
flying=self._manage_downloading_greenlets(flying, objfile, broken_greenlets=broken, |
|
352 |
sleeptime=0.1) |
|
353 |
#workaround for eventlib bug that breaks greenlets: replace them with new ones |
|
354 |
for brgr in broken: |
|
355 |
brgr['handler'] = self._get_block_async(obj, data_range=brgr['data_range'], |
|
356 |
**restargs) |
|
357 |
flying.append(brgr) |
|
334 | 358 |
objfile.truncate(total_size) |
335 | 359 |
|
336 | 360 |
gevent.joinall(flying) |
Also available in: Unified diff