Revision e02728f9

b/kamaki/cli/__init__.py
34 34

  
35 35
from __future__ import print_function
36 36

  
37
import gevent.monkey
38
#Monkey-patch everything for gevent early on
39
gevent.monkey.patch_all()
40

  
41 37
import logging
42 38

  
43 39
from inspect import getargspec
b/kamaki/clients/connection/kamakicon.py
36 36
from synnefo.lib.pool.http import get_http_connection
37 37
from kamaki.clients.connection import HTTPConnection, HTTPResponse,\
38 38
    HTTPConnectionError
39
from gevent.dns import DNSError
40 39
from socket import gaierror
41 40

  
42 41
from json import loads
......
141 140
                body=data)
142 141
        except Exception as err:
143 142
            conn.close()
144
            if isinstance(err, DNSError) or isinstance(err, gaierror):
143
            if isinstance(err, gaierror):
145 144
                raise HTTPConnectionError('Cannot connect to %s' % self.url,
146 145
                    status=701,
147 146
                    details='%s: %s' % (type(err), unicode(err)))
b/kamaki/clients/connection/tests.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
import gevent
35
import gevent.monkey
36
# Monkey-patch everything for gevent early on
37
gevent.monkey.patch_all()
38
import gevent.pool
39

  
40 34
import unittest
41 35
import sys
42 36
from StringIO import StringIO
......
68 62
    def tearDown(self):
69 63
        pass
70 64

  
65
    """
71 66
    def _get_async_content(self, con, **kwargs):
72 67
        class SilentGreenlet(gevent.Greenlet):
73 68
            def _report_error(self, exc_info):
......
84 79
        g = SilentGreenlet(self._get_content_len, con, **kwargs)
85 80
        self.async_pool.start(g)
86 81
        return g
82
    """
87 83

  
88 84
    def _get_content_len(self, con, **kwargs):
89 85
        r = con.perform_request('GET', **kwargs)
......
113 109
        self.assertNotEqual(r2, r4)
114 110
        #print('1:%s 2:%s 3:%s 4:%s 5:%s'%(r1, r2, r3, r4, r5))
115 111

  
116
        gevent.joinall([h1, h2, h3, h4, h5])
117

  
118 112
if __name__ == '__main__':
119 113
    suiteFew = unittest.TestSuite()
120 114
    suiteFew.addTest(unittest.makeSuite(testKamakiCon))
b/kamaki/clients/pithos.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
import gevent
35
#import gevent.monkey
36
# Monkey-patch everything for gevent early on
37
#gevent.monkey.patch_all()
38
import gevent.pool
34
from threading import Thread
39 35

  
40 36
from os import fstat
41 37
from hashlib import new as newhashlib
42
from time import time, sleep
43
import sys
38
from time import time
44 39

  
45 40
from binascii import hexlify
46 41

  
......
69 64
    return (start, end)
70 65

  
71 66

  
67
class SilentEvent(Thread):
68
    """ Thread-run method(*args, **kwargs)
69
        put exception in exception_bucket
70
    """
71
    def __init__(self, method, *args, **kwargs):
72
        super(self.__class__, self).__init__()
73
        self.method = method
74
        self.args = args
75
        self.kwargs = kwargs
76

  
77
    @property
78
    def exception(self):
79
        return getattr(self, '_exception', False)
80

  
81
    @property
82
    def value(self):
83
        return getattr(self, '_value', None)
84

  
85
    def run(self):
86
        try:
87
            self._value = self.method(*(self.args), **(self.kwargs))
88
        except Exception as e:
89
            print('______\n%s\n_______' % e)
90
            self._exception = e
91

  
92

  
72 93
class PithosClient(PithosRestAPI):
73 94
    """GRNet Pithos API client"""
74 95

  
96
    _thread_exceptions = []
97

  
75 98
    def __init__(self, base_url, token, account=None, container=None):
76 99
        super(PithosClient, self).__init__(base_url, token, account, container)
77 100
        self.async_pool = None
......
117 140

  
118 141
    # upload_* auxiliary methods
119 142
    def put_block_async(self, data, hash):
120
        class SilentGreenlet(gevent.Greenlet):
121
            def _report_error(self, exc_info):
122
                try:
123
                    sys.stderr = StringIO()
124
                    gevent.Greenlet._report_error(self, exc_info)
125
                finally:
126
                    if hasattr(sys, '_stderr'):
127
                        sys.stderr = sys._stderr
128
        POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
129
        if self.async_pool is None:
130
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
131
        g = SilentGreenlet(self.put_block, data, hash)
132
        self.async_pool.start(g)
133
        return g
143
        event = SilentEvent(method=self.put_block, data=data, hash=hash)
144
        event.start()
145
        return event
134 146

  
135 147
    def put_block(self, data, hash):
136 148
        r = self.container_post(update=True,
......
233 245
            data = fileobj.read(bytes)
234 246
            r = self.put_block_async(data, hash)
235 247
            flying.append(r)
236
            for r in flying:
237
                if r.ready():
238
                    if r.exception:
239
                        raise r.exception
248
            unfinished = []
249
            for thread in flying:
250
                if thread.isAlive() or thread.exception:
251
                    unfinished.append(thread)
252
                else:
240 253
                    if upload_cb:
241 254
                        upload_gen.next()
242
            flying = [r for r in flying if not r.ready()]
255
            flying = unfinished
243 256
        while upload_cb:
244 257
            try:
245 258
                upload_gen.next()
246 259
            except StopIteration:
247 260
                break
248
        gevent.joinall(flying)
261

  
262
        for thread in flying:
263
            thread.join()
249 264

  
250 265
        failures = [r for r in flying if r.exception]
251 266
        if len(failures):
252
            details = ', '.join(['(%s).%s' % (i, r.exception)\
267
            details = ', '.join([' (%s).%s' % (i, r.exception)\
253 268
                for i, r in enumerate(failures)])
254 269
            raise ClientError(message="Block uploading failed",
255 270
                status=505,
......
340 355
            dst.flush()
341 356

  
342 357
    def _get_block_async(self, obj, **restargs):
343
        class SilentGreenlet(gevent.Greenlet):
344
            def _report_error(self, exc_info):
345
                try:
346
                    sys.stderr = StringIO()
347
                    gevent.Greenlet._report_error(self, exc_info)
348
                finally:
349
                    if hasattr(sys, '_stderr'):
350
                        sys.stderr = sys._stderr
351
        if not hasattr(self, 'POOL_SIZE'):
352
            self.POOL_SIZE = 5
353
        if self.async_pool is None:
354
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
355
        g = SilentGreenlet(self.object_get, obj,
358
        event = SilentEvent(self.object_get,
359
            obj,
356 360
            success=(200, 206),
357 361
            **restargs)
358
        self.async_pool.start(g)
359
        return g
362
        event.start()
363
        return event
360 364

  
361 365
    def _hash_from_file(self, fp, start, size, blockhash):
362 366
        fp.seek(start)
......
365 369
        h.update(block.strip('\x00'))
366 370
        return hexlify(h.digest())
367 371

  
368
    def _greenlet2file(self,
369
        flying_greenlets,
372
    def _thread2file(self,
373
        flying,
370 374
        local_file,
371 375
        offset=0,
372 376
        **restargs):
......
375 379
            - e.g. if the range is 10-100, all
376 380
        blocks will be written to normal_position - 10"""
377 381
        finished = []
378
        for start, g in flying_greenlets.items():
379
            if g.ready():
382
        for start, g in flying.items():
383
            if not g.isAlive():
380 384
                if g.exception:
381 385
                    raise g.exception
382 386
                block = g.value.content
383 387
                local_file.seek(start - offset)
384 388
                local_file.write(block)
385 389
                self._cb_next()
386
                finished.append(flying_greenlets.pop(start))
390
                finished.append(flying.pop(start))
387 391
        local_file.flush()
388 392
        return finished
389 393

  
......
399 403
        **restargs):
400 404

  
401 405
        file_size = fstat(local_file.fileno()).st_size if resume else 0
402
        flying_greenlets = {}
403
        finished_greenlets = []
406
        flying = {}
407
        finished = []
404 408
        offset = 0
405 409
        if filerange is not None:
406 410
            rstart = int(filerange.split('-')[0])
......
414 418
                blockhash):
415 419
                self._cb_next()
416 420
                continue
417
            if len(flying_greenlets) >= self.POOL_SIZE:
418
                finished_greenlets += self._greenlet2file(flying_greenlets,
421
            if len(flying) >= self.POOL_SIZE:
422
                finished += self._thread2file(flying,
419 423
                    local_file,
420 424
                    offset,
421 425
                    **restargs)
......
426 430
                self._cb_next()
427 431
                continue
428 432
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
429
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
430

  
431
        #check the greenlets
432
        while len(flying_greenlets) > 0:
433
            sleep(0.001)
434
            finished_greenlets += self._greenlet2file(flying_greenlets,
435
                local_file,
436
                offset,
437
                **restargs)
433
            flying[start] = self._get_block_async(obj, **restargs)
438 434

  
439
        gevent.joinall(finished_greenlets)
435
        for thread in flying.values():
436
            thread.join()
437
        finished += self._thread2file(flying, local_file, offset, **restargs)
440 438

  
441 439
    def download_object(self,
442 440
        obj,
b/kamaki/clients/tests.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
import gevent.monkey
35
# Monkey-patch everything for gevent early on
36
gevent.monkey.patch_all()
37

  
38 34
from argparse import ArgumentParser
39 35
import unittest
40 36
import time
b/setup.py
34 34
# or implied, of GRNET S.A.
35 35

  
36 36
from setuptools import setup
37
#from sys import version_info
37
from sys import version_info
38 38

  
39 39
import kamaki
40 40

  
41
#Suggested packages can be installed manually later, but it is not nessecary
42
suggested = ['ansicolors==1.0.2', 'progress==1.0.1']
43
required = ['gevent>=0.13.6', 'snf-common>=0.10', 'argparse']
41

  
42
optional = ['ansicolors', 'progress']
43
required = ['snf-common>=0.10', 'argparse']
44 44

  
45 45
setup(
46 46
    name='kamaki',
47 47
    version=kamaki.__version__,
48
    description='A command-line tool for poking clouds',
48
    description='A command-line tool for managing clouds',
49 49
    long_description=open('README.rst').read(),
50 50
    url='http://code.grnet.gr/projects/kamaki',
51 51
    license='BSD',
52
    packages=['kamaki',
53
        'kamaki.cli',
54
        'kamaki.clients',
55
        'kamaki.clients.connection',
56
        'kamaki.cli.commands'],
52
    packages=['kamaki', 'kamaki.clients', 'kamaki.clients.connection', 'kamaki.cli', 'kamaki.cli.commands'],
57 53
    include_package_data=True,
58 54
    entry_points={
59 55
        'console_scripts': ['kamaki = kamaki.cli:main']

Also available in: Unified diff