Revision 4ab1af1a

b/snf-common/synnefo/lib/astakos.py
37 37
from urllib import unquote
38 38
from django.utils import simplejson as json
39 39

  
40
from synnefo.lib.pool.http import get_http_connection
40
from synnefo.lib.pool.http import PooledHTTPConnection
41 41

  
42 42
logger = logging.getLogger(__name__)
43 43

  
......
78 78
                                     'application/octet-stream')
79 79
    kwargs['headers'].setdefault('content-length', len(body) if body else 0)
80 80

  
81
    conn = get_http_connection(p.netloc, p.scheme)
82
    try:
81
    with PooledHTTPConnection(p.netloc, p.scheme) as conn:
83 82
        conn.request(method, p.path + '?' + p.query, **kwargs)
84 83
        response = conn.getresponse()
85 84
        headers = response.getheaders()
......
87 86
        length = response.getheader('content-length', None)
88 87
        data = response.read(length)
89 88
        status = int(response.status)
90
    finally:
91
        conn.close()
92 89

  
93 90
    if status < 200 or status >= 300:
94 91
        raise Exception(data, status)
b/snf-common/synnefo/lib/pool/__init__.py
196 196
        log.debug("PUT-AFTER: finished putting object %r back to pool %r",
197 197
                  obj, self)
198 198

  
199
    def pool_create_free(self):
200
        """Create a free new object that is not put into the pool.
201

  
202
        Just for convenience, let the users create objects with
203
        the exact same configuration as those that are used with the pool
204

  
205
        """
206
        obj = self._pool_create_free()
207
        return obj
208

  
209
    def _pool_create_free(self):
210
        """Create a free new object that is not put into the pool.
211

  
212
        This should be overriden by pool classes.
213
        Otherwise, it just calls _pool_create().
214

  
215
        """
216
        return self._pool_create()
217

  
199 218
    def _pool_create(self):
200 219
        """Create a new object to be used with this pool.
201 220

  
202 221
        Create a new object to be used with this pool,
203 222
        should be overriden in subclasses.
204 223
        Must be thread-safe.
224

  
205 225
        """
206 226
        raise NotImplementedError
207 227

  
......
226 246

  
227 247
        """
228 248
        raise NotImplementedError
249

  
250

  
251
class PooledObject(object):
252
    """Generic Object Pool context manager and pooled object proxy.
253

  
254
    The PooledObject instance acts as a context manager to
255
    be used in a with statement:
256

  
257
        with PooledObject(...) as obj:
258
            use(obj)
259

  
260
    The with block above is roughly equivalent to:
261

  
262
        pooled = PooledObject(...):
263
        try:
264
            obj = pooled.acquire()
265
            assert(obj is pooled.obj)
266
            use(obj)
267
        finally:
268
            pooled.release()
269

  
270
    After exiting the with block, or releasing,
271
    the code MUST not use the obj again in any way.
272

  
273
    """
274

  
275
    # NOTE: We need all definitions at class-level
276
    # to avoid infinite __gettatr__() recursion.
277
    # This is also true for subclasses.
278

  
279
    # NOTE: Typically you will only need to override
280
    #       __init__() and get_pool
281

  
282
    # Initialization. Do not customize.
283
    _pool_settings = None
284
    _pool_get_settings = None
285
    _pool_kwargs = None
286
    _pool = None
287
    obj = None
288

  
289
    #####################################################
290
    ### Subclass attribute customization begins here. ###
291

  
292
    _pool_log_prefix = "POOL"
293
    _pool_class = ObjectPool
294

  
295
    # default keyword args to pass to pool initialization
296
    _pool_default_settings = (
297
            ('size', 25),
298
        )
299

  
300
    # keyword args to pass to pool_get
301
    _pool_default_get_settings = (
302
            ('blocking', True),
303
            #('timeout', None),
304
            ('create', True),
305
            ('verify', True),
306
        )
307

  
308
    # behavior settings
309
    _pool_attach_context = False
310
    _pool_disable_after_release = True
311
    _pool_ignore_double_release = False
312

  
313
    ###  Subclass attribute customization ends here.  ###
314
    #####################################################
315

  
316
    def __init__(self, pool_settings=None,
317
                       get_settings=None,
318
                       attach_context=None,
319
                       disable_after_release=None,
320
                       ignore_double_release=None,
321
                       **kwargs):
322
        """Initialize a PooledObject instance.
323

  
324
        Accept only keyword arguments.
325
        Some of them are filtered for this instance's configuration,
326
        and the rest are saved in ._pool_kwargs for later use.
327

  
328
        The filtered keywords are:
329

  
330
        pool_settings:  keyword args forwarded to pool instance initialization
331
                        in get_pool(), on top of the class defaults.
332
                        If not given, the remaining keyword args are
333
                        forwarded instead.
334

  
335
        get_settings:   keyword args forwarded to the pool's .pool_get() on top
336
                        of the class defaults.
337

  
338
        attach_context: boolean overriding the class default.
339
                        If True, after getting an object from the pool,
340
                        attach self onto it before returning it,
341
                        so that the context manager caller can have
342
                        access to the manager object within the with: block.
343

  
344
        disable_after_release:
345
                        boolean overriding the class default.
346
                        If True, the PooledObject will not allow a second
347
                        acquisition after the first release. For example,
348
                        the second with will raise an AssertionError:
349
                        manager = PooledObject()
350
                        with manager as c:
351
                            pass
352
                        with manager as c:
353
                            pass
354

  
355
        ignore_double_release:
356
                        boolean overriding the class default.
357
                        If True, the PooledObject will allow consecutive
358
                        calls to release the underlying pooled object.
359
                        Only the first one has an effect.
360
                        If False, an AssertionError is raised.
361

  
362
        """
363
        self._pool_kwargs = kwargs
364
        self._pool = None
365
        self.obj = None
366

  
367
        _get_settings = dict(self._pool_default_get_settings)
368
        if get_settings is not None:
369
            _get_settings.update(get_settings)
370
        self._pool_get_settings = _get_settings
371

  
372
        if attach_context is not None:
373
            self._pool_attach_context = attach_context
374

  
375
        if pool_settings is None:
376
            pool_settings = kwargs
377

  
378
        _pool_settings = dict(self._pool_default_settings)
379
        _pool_settings.update(**pool_settings)
380
        self._pool_settings = _pool_settings
381

  
382
    def get_pool(self):
383
        """Return a suitable pool object to work with.
384

  
385
        Called within .acquire(), it is meant to be
386
        overriden by sublasses, to create a new pool,
387
        or retrieve an existing one, based on the PooledObject
388
        initialization keywords stored in self._pool_kwargs.
389

  
390
        """
391
        pool = self._pool_class(**self._pool_settings)
392
        return pool
393

  
394
    ### Maybe overriding get_pool() and __init__() above is enough ###
395

  
396
    def __repr__(self):
397
        return ("<object %s of class %s: "
398
                "proxy for object (%r) in pool (%r)>" % (
399
                id(self), self.__class__.__name__,
400
                self.obj, self._pool))
401

  
402
    __str__ = __repr__
403

  
404
    ## Proxy the real object. Disabled until needed.
405
    ##
406
    ##def __getattr__(self, name):
407
    ##    return getattr(self.obj, name)
408

  
409
    ##def __setattr__(self, name, value):
410
    ##    if hasattr(self, name):
411
    ##        _setattr = super(PooledObject, self).__setattr__
412
    ##        _setattr(name, value)
413
    ##    else:
414
    ##        setattr(self.obj, name, value)
415

  
416
    ##def __delattr_(self, name):
417
    ##    _delattr = super(PooledObject, self).__delattr__
418
    ##    if hasattr(self, name):
419
    ##        _delattr(self, name)
420
    ##    else:
421
    ##        delattr(self.obj, name)
422

  
423
    def __enter__(self):
424
        return self.acquire()
425

  
426
    def __exit__(self, exc_type, exc_value, trace):
427
        return self.release()
428

  
429
    def acquire(self):
430
        log.debug("%s Acquiring (context: %r)", self._pool_log_prefix, self)
431
        pool = self._pool
432
        if pool is False:
433
            m = "%r: has been released. No further pool access allowed." % (
434
                self,)
435
            raise AssertionError(m)
436
        if pool is not None:
437
            m = "Double acquire in %r" % self
438
            raise AssertionError(m)
439

  
440
        pool = self.get_pool()
441
        self._pool = pool
442

  
443
        obj = pool.pool_get(**self._pool_get_settings)
444
        if self._pool_attach_context:
445
            obj._pool_context = self
446
        self.obj = obj
447
        log.debug("%s Acquired %r", self._pool_log_prefix, obj)
448
        return obj
449

  
450
    def release(self):
451
        log.debug("%s Releasing (context: %r)", self._pool_log_prefix, self)
452
        pool = self._pool
453
        if pool is None:
454
            m = "%r: no pool" % (self,)
455
            raise AssertionError(m)
456

  
457
        obj = self.obj
458
        if obj is None:
459
            if self._pool_ignore_double_release:
460
                return
461
            m = "%r: no object. Double release?" % (self,)
462
            raise AssertionError(m)
463

  
464
        pool.pool_put(obj)
465
        self.obj = None
466
        if self._pool_disable_after_release:
467
            self._pool = False
b/snf-common/synnefo/lib/pool/http.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from synnefo.lib.pool import ObjectPool
34
from synnefo.lib.pool import ObjectPool, PooledObject
35 35
from select import select
36 36

  
37 37
from httplib import (
......
40 40
    ResponseNotReady,
41 41
)
42 42

  
43
from new import instancemethod
43
from threading import Lock
44 44

  
45 45
import logging
46 46

  
47 47
log = logging.getLogger(__name__)
48 48

  
49 49
_pools = {}
50
pool_size = 8
51

  
50
_pools_mutex = Lock()
52 51

  
52
default_pool_size = 100
53 53
USAGE_LIMIT = 1000
54 54

  
55 55

  
56 56
def init_http_pooling(size):
57
    global pool_size
58
    pool_size = size
59

  
60

  
61
def put_http_connection(conn):
62
    pool = conn._pool
63
    log.debug("HTTP-PUT-BEFORE: putting connection %r back to pool %r",
64
              conn, pool)
65
    if pool is None:
66
        log.debug("HTTP-PUT: connection %r does not have a pool", conn)
67
        return
68
    conn._pool = None
69
    pool.pool_put(conn)
57
    global default_pool_size
58
    default_pool_size = size
70 59

  
71 60

  
72 61
class HTTPConnectionPool(ObjectPool):
......
93 82
    def _pool_create(self):
94 83
        log.debug("CREATE-HTTP-BEFORE from pool %r", self)
95 84
        conn = self.connection_class(self.netloc)
96
        conn._use_counter = USAGE_LIMIT
97
        conn._pool = self
98
        conn._real_close = conn.close
99
        conn.close = instancemethod(put_http_connection, conn, type(conn))
85
        conn._pool_use_counter = USAGE_LIMIT
100 86
        return conn
101 87

  
102 88
    def _pool_verify(self, conn):
103 89
        log.debug("VERIFY-HTTP")
104
        # _pool verify is called at every pool_get().
105
        # Make sure this connection obj is associated with the proper pool.
106
        # The association is broken by put_http_connection(), to prevent
107
        # a connection object from being returned to the pool twice,
108
        # on duplicate invocations of conn.close().
109 90
        if conn is None:
110 91
            return False
111
        if not conn._pool:
112
            conn._pool = self
113 92
        sock = conn.sock
114 93
        if sock is None:
115 94
            return True
......
120 99
    def _pool_cleanup(self, conn):
121 100
        log.debug("CLEANUP-HTTP")
122 101
        # every connection can be used a finite number of times
123
        conn._use_counter -= 1
102
        conn._pool_use_counter -= 1
124 103

  
125 104
        # see httplib source for connection states documentation
126
        if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle':
105
        if (conn._pool_use_counter > 0 and
106
            conn._HTTPConnection__state == 'Idle'):
127 107
            try:
128 108
                conn.getresponse()
129 109
            except ResponseNotReady:
......
131 111
                return False
132 112

  
133 113
        log.debug("CLEANUP-HTTP: Closing connection. Will not reuse.")
134
        conn._real_close()
114
        conn.close()
135 115
        return True
136 116

  
137 117

  
138
def get_http_connection(netloc=None, scheme='http', pool_size=pool_size):
139
    log.debug("HTTP-GET: Getting HTTP connection")
140
    if netloc is None:
141
        m = "netloc cannot be None"
142
        raise ValueError(m)
143
    # does the pool need to be created?
144
    # ensure distinct pools are created for every (scheme, netloc) combination
145
    key = (scheme, netloc)
146
    if key not in _pools:
147
        log.debug("HTTP-GET: Creating pool for key %s", key)
148
        pool = HTTPConnectionPool(scheme, netloc, size=pool_size)
149
        _pools[key] = pool
150

  
151
    obj = _pools[key].pool_get()
152
    log.debug("HTTP-GET: Returning object %r", obj)
153
    return obj
118
class PooledHTTPConnection(PooledObject):
119

  
120
    _pool_log_prefix = "HTTP"
121
    _pool_class = HTTPConnectionPool
122

  
123
    def __init__(self, netloc, scheme='http', pool=None, **kw):
124
        kw['netloc'] = netloc
125
        kw['scheme'] = scheme
126
        kw['pool'] = pool
127
        super(PooledHTTPConnection, self).__init__(**kw)
128

  
129
    def get_pool(self):
130
        kwargs = self._pool_kwargs
131
        pool = kwargs.pop('pool', None)
132
        if pool is not None:
133
            return pool
134

  
135
        # pool was not given, find one from the global registry
136
        scheme = kwargs['scheme']
137
        netloc = kwargs['netloc']
138
        size = kwargs.get('size', default_pool_size)
139
        # ensure distinct pools for every (scheme, netloc) combination
140
        key = (scheme, netloc)
141
        with _pools_mutex:
142
            if key not in _pools:
143
                log.debug("HTTP-GET: Creating pool for key %s", key)
144
                pool = HTTPConnectionPool(scheme, netloc, size=size)
145
                _pools[key] = pool
146
            else:
147
                pool = _pools[key]
148

  
149
        return pool
b/snf-common/synnefo/lib/pool/tests.py
57 57
import threading
58 58
from collections import defaultdict
59 59

  
60
from socket import socket, AF_INET, SOCK_STREAM, IPPROTO_TCP, SHUT_RDWR
61

  
60 62
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError
61
from synnefo.lib.pool.http import get_http_connection
63
from synnefo.lib.pool.http import PooledHTTPConnection, HTTPConnectionPool
62 64
from synnefo.lib.pool.http import _pools as _http_pools
63 65

  
64 66
# Use backported unittest functionality if Python < 2.7
......
291 293

  
292 294

  
293 295
class TestHTTPConnectionTestCase(unittest.TestCase):
294
    def test_double_close(self):
295
        conn = get_http_connection("127.0.0.1", "http")
296
        self.assertEqual(conn._pool, _http_pools[("http", "127.0.0.1")])
297
        conn.close()
298
        self.assertIsNone(conn._pool)
299
        # This call does nothing, because conn._pool is already None
300
        conn.close()
301
        self.assertIsNone(conn._pool)
296
    def setUp(self):
297
        #netloc = "127.0.0.1:9999"
298
        #scheme='http'
299
        #self.pool = HTTPConnectionPool(
300
        #                netloc=netloc,
301
        #                scheme=scheme,
302
        #                pool_size=1)
303
        #key = (scheme, netloc)
304
        #_http_pools[key] = pool
305

  
306
        _http_pools.clear()
307

  
308
        self.host = "127.0.0.1"
309
        self.port = 9999
310
        self.netloc = "%s:%s" % (self.host, self.port)
311
        self.scheme = "http"
312
        self.key = (self.scheme, self.netloc)
313

  
314
        sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
315
        sock.bind((self.host, self.port))
316
        sock.listen(1)
317
        self.sock = sock
318

  
319
    def tearDown(self):
320
        sock = self.sock
321
        sock.shutdown(SHUT_RDWR)
322
        sock.close()
323

  
324
    def test_double_release(self):
325
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
326
        pooled.acquire()
327
        pool = pooled._pool
328
        self.assertTrue(pooled._pool is _http_pools[(self.scheme, self.netloc)])
329
        pooled.release()
330

  
331
        poolsize = len(pool._set)
332

  
333
        if PooledHTTPConnection._pool_disable_after_release:
334
            self.assertTrue(pooled._pool is False)
335

  
336
        if not PooledHTTPConnection._pool_ignore_double_release:
337
            with self.assertRaises(AssertionError):
338
                pooled.release()
339
        else:
340
            pooled.release()
341

  
342
        self.assertEqual(poolsize, len(pool._set))
302 343

  
303 344
    def test_distinct_pools_per_scheme(self):
304
        conn = get_http_connection("127.0.0.1", "http")
305
        pool = conn._pool
306
        self.assertTrue(pool is _http_pools[("http", "127.0.0.1")])
307
        conn.close()
308
        conn2 = get_http_connection("127.0.0.1", "https")
309
        self.assertTrue(conn is not conn2)
310
        self.assertNotEqual(pool, conn2._pool)
311
        self.assertTrue(conn2._pool is _http_pools[("https", "127.0.0.1")])
312
        conn2.close()
345
        with PooledHTTPConnection("127.0.0.1", "http",
346
                                  attach_context=True) as conn:
347
            pool = conn._pool_context._pool
348
            self.assertTrue(pool is _http_pools[("http", "127.0.0.1")])
349

  
350
        with PooledHTTPConnection("127.0.0.1", "https",
351
                                  attach_context=True) as conn2:
352
            pool2 = conn2._pool_context._pool
353
            self.assertTrue(conn is not conn2)
354
            self.assertNotEqual(pool, pool2)
355
            self.assertTrue(pool2 is _http_pools[("https", "127.0.0.1")])
356

  
357
    def test_clean_connection(self):
358
        pool = None
359
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
360
        conn = pooled.acquire()
361
        pool = pooled._pool
362
        self.assertTrue(pool is not None)
363
        pooled.release()
364
        self.assertTrue(pooled._pool is False)
365
        poolset = pool._set
366
        self.assertEqual(len(poolset), 1)
367
        pooled_conn = list(poolset)[0]
368
        self.assertTrue(pooled_conn is conn)
369

  
370
    def test_dirty_connection(self):
371
        pooled = PooledHTTPConnection(self.netloc, self.scheme)
372
        conn = pooled.acquire()
373
        pool = pooled._pool
374
        conn.request("GET", "/")
375
        serversock, addr = self.sock.accept()
376
        serversock.send("HTTP/1.1 200 OK\n"
377
                        "Content-Length: 6\n"
378
                        "\n"
379
                        "HELLO\n")
380
        time.sleep(0.3)
381
        # We would read this message like this
382
        #resp = conn.getresponse()
383
        # but we won't so the connection is dirty
384
        pooled.release()
385

  
386
        poolset = pool._set
387
        self.assertEqual(len(poolset), 0)
388

  
389
    def test_context_manager_exception_safety(self):
390
        class TestError(Exception):
391
            pass
392

  
393
        for i in xrange(10):
394
            pool = None
395
            try:
396
                with PooledHTTPConnection(
397
                        self.netloc, self.scheme,
398
                        size=1, attach_context=True) as conn:
399
                    pool = conn._pool_context._pool
400
                    raise TestError()
401
            except TestError:
402
                self.assertTrue(pool is not None)
403
                self.assertEqual(pool._semaphore._Semaphore__value, 1)
404

  
313 405

  
314 406
if __name__ == '__main__':
315 407
    unittest.main()
b/snf-common/synnefo/lib/quotaholder/http.py
32 32
# or implied, of GRNET S.A.
33 33

  
34 34
from synnefo.lib.commissioning import Callpoint, CallError
35
from synnefo.lib.pool.http import get_http_connection
35
from synnefo.lib.pool.http import PooledHTTPConnection
36 36
from .api import QuotaholderAPI
37 37

  
38 38
from json import loads as json_loads, dumps as json_dumps
......
70 70

  
71 71
        logger.debug("%s %s\n%s\n<<<\n", method, path, json_data[:128])
72 72
        headers = {'X-Auth-Token': self._token}
73
        conn = get_http_connection(scheme=self._scheme, netloc=self._netloc,
74
                                   pool_size=self._poolsize)
75
        try:
73
        with PooledHTTPConnection(scheme=self._scheme,
74
                                  netloc=self._netloc,
75
                                  size=self._poolsize) as conn:
76 76
            conn.request(method, path, body=json_data, headers=headers)
77 77
            resp = conn.getresponse()
78
        finally:
79
            conn.close()
78
            body = resp.read()
80 79

  
81 80
        logger.debug(">>>\nStatus: %s", resp.status)
82

  
83
        body = resp.read()
84 81
        logger.debug("\n%s\n<<<\n", body[:128] if body else None)
85 82

  
86 83
        status = int(resp.status)
b/snf-cyclades-app/synnefo/api/delegate.py
42 42
USER_CATALOG_URL = getattr(settings, 'CYCLADES_USER_CATALOG_URL', None)
43 43
USER_FEEDBACK_URL = getattr(settings, 'CYCLADES_USER_FEEDBACK_URL', None)
44 44

  
45
from synnefo.lib.pool.http import get_http_connection
45
from synnefo.lib.pool.http import PooledHTTPConnection
46 46

  
47 47
logger = logging.getLogger(__name__)
48 48

  
......
57 57
    kwargs['headers'].setdefault('content-type', 'application/json')
58 58
    kwargs['headers'].setdefault('content-length', len(body) if body else 0)
59 59

  
60
    conn = get_http_connection(p.netloc, p.scheme)
61
    try:
60
    with PooledHTTPConnection(p.netloc, p.scheme) as conn:
62 61
        conn.request(request.method, p.path + '?' + p.query, **kwargs)
63 62
        response = conn.getresponse()
64 63
        length = response.getheader('content-length', None)
65 64
        data = response.read(length)
66 65
        status = int(response.status)
67 66
        return HttpResponse(data, status=status)
68
    finally:
69
        conn.close()
70 67

  
71 68

  
72 69
@csrf_exempt
b/snf-pithos-app/pithos/api/delegate.py
46 46
from pithos.api.settings import (
47 47
    AUTHENTICATION_USERS, USER_LOGIN_URL, USER_FEEDBACK_URL, USER_CATALOG_URL)
48 48

  
49
from synnefo.lib.pool.http import get_http_connection
49
from synnefo.lib.pool.http import PooledHTTPConnection
50 50

  
51 51
logger = logging.getLogger(__name__)
52 52

  
......
79 79
    kwargs['headers'].setdefault('content-type', 'application/json')
80 80
    kwargs['headers'].setdefault('content-length', len(body) if body else 0)
81 81

  
82
    conn = get_http_connection(p.netloc, p.scheme)
83
    try:
82
    with PooledHTTPConnection(p.netloc, p.scheme) as conn:
84 83
        conn.request(request.method, p.path + '?' + p.query, **kwargs)
85 84
        response = conn.getresponse()
86 85
        length = response.getheader('content-length', None)
87 86
        data = response.read(length)
88 87
        status = int(response.status)
89 88
        return HttpResponse(data, status=status)
90
    finally:
91
        conn.close()
92 89

  
93 90
@csrf_exempt
94 91
def delegate_to_feedback_service(request):

Also available in: Unified diff