Revision 4ab1af1a
b/snf-common/synnefo/lib/astakos.py | ||
---|---|---|
37 | 37 |
from urllib import unquote |
38 | 38 |
from django.utils import simplejson as json |
39 | 39 |
|
40 |
from synnefo.lib.pool.http import get_http_connection
|
|
40 |
from synnefo.lib.pool.http import PooledHTTPConnection
|
|
41 | 41 |
|
42 | 42 |
logger = logging.getLogger(__name__) |
43 | 43 |
|
... | ... | |
78 | 78 |
'application/octet-stream') |
79 | 79 |
kwargs['headers'].setdefault('content-length', len(body) if body else 0) |
80 | 80 |
|
81 |
conn = get_http_connection(p.netloc, p.scheme) |
|
82 |
try: |
|
81 |
with PooledHTTPConnection(p.netloc, p.scheme) as conn: |
|
83 | 82 |
conn.request(method, p.path + '?' + p.query, **kwargs) |
84 | 83 |
response = conn.getresponse() |
85 | 84 |
headers = response.getheaders() |
... | ... | |
87 | 86 |
length = response.getheader('content-length', None) |
88 | 87 |
data = response.read(length) |
89 | 88 |
status = int(response.status) |
90 |
finally: |
|
91 |
conn.close() |
|
92 | 89 |
|
93 | 90 |
if status < 200 or status >= 300: |
94 | 91 |
raise Exception(data, status) |
b/snf-common/synnefo/lib/pool/__init__.py | ||
---|---|---|
196 | 196 |
log.debug("PUT-AFTER: finished putting object %r back to pool %r", |
197 | 197 |
obj, self) |
198 | 198 |
|
199 |
def pool_create_free(self): |
|
200 |
"""Create a free new object that is not put into the pool. |
|
201 |
|
|
202 |
Just for convenience, let the users create objects with |
|
203 |
the exact same configuration as those that are used with the pool |
|
204 |
|
|
205 |
""" |
|
206 |
obj = self._pool_create_free() |
|
207 |
return obj |
|
208 |
|
|
209 |
def _pool_create_free(self): |
|
210 |
"""Create a free new object that is not put into the pool. |
|
211 |
|
|
212 |
This should be overriden by pool classes. |
|
213 |
Otherwise, it just calls _pool_create(). |
|
214 |
|
|
215 |
""" |
|
216 |
return self._pool_create() |
|
217 |
|
|
199 | 218 |
def _pool_create(self): |
200 | 219 |
"""Create a new object to be used with this pool. |
201 | 220 |
|
202 | 221 |
Create a new object to be used with this pool, |
203 | 222 |
should be overriden in subclasses. |
204 | 223 |
Must be thread-safe. |
224 |
|
|
205 | 225 |
""" |
206 | 226 |
raise NotImplementedError |
207 | 227 |
|
... | ... | |
226 | 246 |
|
227 | 247 |
""" |
228 | 248 |
raise NotImplementedError |
249 |
|
|
250 |
|
|
251 |
class PooledObject(object): |
|
252 |
"""Generic Object Pool context manager and pooled object proxy. |
|
253 |
|
|
254 |
The PooledObject instance acts as a context manager to |
|
255 |
be used in a with statement: |
|
256 |
|
|
257 |
with PooledObject(...) as obj: |
|
258 |
use(obj) |
|
259 |
|
|
260 |
The with block above is roughly equivalent to: |
|
261 |
|
|
262 |
pooled = PooledObject(...): |
|
263 |
try: |
|
264 |
obj = pooled.acquire() |
|
265 |
assert(obj is pooled.obj) |
|
266 |
use(obj) |
|
267 |
finally: |
|
268 |
pooled.release() |
|
269 |
|
|
270 |
After exiting the with block, or releasing, |
|
271 |
the code MUST not use the obj again in any way. |
|
272 |
|
|
273 |
""" |
|
274 |
|
|
275 |
# NOTE: We need all definitions at class-level |
|
276 |
# to avoid infinite __gettatr__() recursion. |
|
277 |
# This is also true for subclasses. |
|
278 |
|
|
279 |
# NOTE: Typically you will only need to override |
|
280 |
# __init__() and get_pool |
|
281 |
|
|
282 |
# Initialization. Do not customize. |
|
283 |
_pool_settings = None |
|
284 |
_pool_get_settings = None |
|
285 |
_pool_kwargs = None |
|
286 |
_pool = None |
|
287 |
obj = None |
|
288 |
|
|
289 |
##################################################### |
|
290 |
### Subclass attribute customization begins here. ### |
|
291 |
|
|
292 |
_pool_log_prefix = "POOL" |
|
293 |
_pool_class = ObjectPool |
|
294 |
|
|
295 |
# default keyword args to pass to pool initialization |
|
296 |
_pool_default_settings = ( |
|
297 |
('size', 25), |
|
298 |
) |
|
299 |
|
|
300 |
# keyword args to pass to pool_get |
|
301 |
_pool_default_get_settings = ( |
|
302 |
('blocking', True), |
|
303 |
#('timeout', None), |
|
304 |
('create', True), |
|
305 |
('verify', True), |
|
306 |
) |
|
307 |
|
|
308 |
# behavior settings |
|
309 |
_pool_attach_context = False |
|
310 |
_pool_disable_after_release = True |
|
311 |
_pool_ignore_double_release = False |
|
312 |
|
|
313 |
### Subclass attribute customization ends here. ### |
|
314 |
##################################################### |
|
315 |
|
|
316 |
def __init__(self, pool_settings=None, |
|
317 |
get_settings=None, |
|
318 |
attach_context=None, |
|
319 |
disable_after_release=None, |
|
320 |
ignore_double_release=None, |
|
321 |
**kwargs): |
|
322 |
"""Initialize a PooledObject instance. |
|
323 |
|
|
324 |
Accept only keyword arguments. |
|
325 |
Some of them are filtered for this instance's configuration, |
|
326 |
and the rest are saved in ._pool_kwargs for later use. |
|
327 |
|
|
328 |
The filtered keywords are: |
|
329 |
|
|
330 |
pool_settings: keyword args forwarded to pool instance initialization |
|
331 |
in get_pool(), on top of the class defaults. |
|
332 |
If not given, the remaining keyword args are |
|
333 |
forwarded instead. |
|
334 |
|
|
335 |
get_settings: keyword args forwarded to the pool's .pool_get() on top |
|
336 |
of the class defaults. |
|
337 |
|
|
338 |
attach_context: boolean overriding the class default. |
|
339 |
If True, after getting an object from the pool, |
|
340 |
attach self onto it before returning it, |
|
341 |
so that the context manager caller can have |
|
342 |
access to the manager object within the with: block. |
|
343 |
|
|
344 |
disable_after_release: |
|
345 |
boolean overriding the class default. |
|
346 |
If True, the PooledObject will not allow a second |
|
347 |
acquisition after the first release. For example, |
|
348 |
the second with will raise an AssertionError: |
|
349 |
manager = PooledObject() |
|
350 |
with manager as c: |
|
351 |
pass |
|
352 |
with manager as c: |
|
353 |
pass |
|
354 |
|
|
355 |
ignore_double_release: |
|
356 |
boolean overriding the class default. |
|
357 |
If True, the PooledObject will allow consecutive |
|
358 |
calls to release the underlying pooled object. |
|
359 |
Only the first one has an effect. |
|
360 |
If False, an AssertionError is raised. |
|
361 |
|
|
362 |
""" |
|
363 |
self._pool_kwargs = kwargs |
|
364 |
self._pool = None |
|
365 |
self.obj = None |
|
366 |
|
|
367 |
_get_settings = dict(self._pool_default_get_settings) |
|
368 |
if get_settings is not None: |
|
369 |
_get_settings.update(get_settings) |
|
370 |
self._pool_get_settings = _get_settings |
|
371 |
|
|
372 |
if attach_context is not None: |
|
373 |
self._pool_attach_context = attach_context |
|
374 |
|
|
375 |
if pool_settings is None: |
|
376 |
pool_settings = kwargs |
|
377 |
|
|
378 |
_pool_settings = dict(self._pool_default_settings) |
|
379 |
_pool_settings.update(**pool_settings) |
|
380 |
self._pool_settings = _pool_settings |
|
381 |
|
|
382 |
def get_pool(self): |
|
383 |
"""Return a suitable pool object to work with. |
|
384 |
|
|
385 |
Called within .acquire(), it is meant to be |
|
386 |
overriden by sublasses, to create a new pool, |
|
387 |
or retrieve an existing one, based on the PooledObject |
|
388 |
initialization keywords stored in self._pool_kwargs. |
|
389 |
|
|
390 |
""" |
|
391 |
pool = self._pool_class(**self._pool_settings) |
|
392 |
return pool |
|
393 |
|
|
394 |
### Maybe overriding get_pool() and __init__() above is enough ### |
|
395 |
|
|
396 |
def __repr__(self): |
|
397 |
return ("<object %s of class %s: " |
|
398 |
"proxy for object (%r) in pool (%r)>" % ( |
|
399 |
id(self), self.__class__.__name__, |
|
400 |
self.obj, self._pool)) |
|
401 |
|
|
402 |
__str__ = __repr__ |
|
403 |
|
|
404 |
## Proxy the real object. Disabled until needed. |
|
405 |
## |
|
406 |
##def __getattr__(self, name): |
|
407 |
## return getattr(self.obj, name) |
|
408 |
|
|
409 |
##def __setattr__(self, name, value): |
|
410 |
## if hasattr(self, name): |
|
411 |
## _setattr = super(PooledObject, self).__setattr__ |
|
412 |
## _setattr(name, value) |
|
413 |
## else: |
|
414 |
## setattr(self.obj, name, value) |
|
415 |
|
|
416 |
##def __delattr_(self, name): |
|
417 |
## _delattr = super(PooledObject, self).__delattr__ |
|
418 |
## if hasattr(self, name): |
|
419 |
## _delattr(self, name) |
|
420 |
## else: |
|
421 |
## delattr(self.obj, name) |
|
422 |
|
|
423 |
def __enter__(self): |
|
424 |
return self.acquire() |
|
425 |
|
|
426 |
def __exit__(self, exc_type, exc_value, trace): |
|
427 |
return self.release() |
|
428 |
|
|
429 |
def acquire(self): |
|
430 |
log.debug("%s Acquiring (context: %r)", self._pool_log_prefix, self) |
|
431 |
pool = self._pool |
|
432 |
if pool is False: |
|
433 |
m = "%r: has been released. No further pool access allowed." % ( |
|
434 |
self,) |
|
435 |
raise AssertionError(m) |
|
436 |
if pool is not None: |
|
437 |
m = "Double acquire in %r" % self |
|
438 |
raise AssertionError(m) |
|
439 |
|
|
440 |
pool = self.get_pool() |
|
441 |
self._pool = pool |
|
442 |
|
|
443 |
obj = pool.pool_get(**self._pool_get_settings) |
|
444 |
if self._pool_attach_context: |
|
445 |
obj._pool_context = self |
|
446 |
self.obj = obj |
|
447 |
log.debug("%s Acquired %r", self._pool_log_prefix, obj) |
|
448 |
return obj |
|
449 |
|
|
450 |
def release(self): |
|
451 |
log.debug("%s Releasing (context: %r)", self._pool_log_prefix, self) |
|
452 |
pool = self._pool |
|
453 |
if pool is None: |
|
454 |
m = "%r: no pool" % (self,) |
|
455 |
raise AssertionError(m) |
|
456 |
|
|
457 |
obj = self.obj |
|
458 |
if obj is None: |
|
459 |
if self._pool_ignore_double_release: |
|
460 |
return |
|
461 |
m = "%r: no object. Double release?" % (self,) |
|
462 |
raise AssertionError(m) |
|
463 |
|
|
464 |
pool.pool_put(obj) |
|
465 |
self.obj = None |
|
466 |
if self._pool_disable_after_release: |
|
467 |
self._pool = False |
b/snf-common/synnefo/lib/pool/http.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from synnefo.lib.pool import ObjectPool |
|
34 |
from synnefo.lib.pool import ObjectPool, PooledObject
|
|
35 | 35 |
from select import select |
36 | 36 |
|
37 | 37 |
from httplib import ( |
... | ... | |
40 | 40 |
ResponseNotReady, |
41 | 41 |
) |
42 | 42 |
|
43 |
from new import instancemethod
|
|
43 |
from threading import Lock
|
|
44 | 44 |
|
45 | 45 |
import logging |
46 | 46 |
|
47 | 47 |
log = logging.getLogger(__name__) |
48 | 48 |
|
49 | 49 |
_pools = {} |
50 |
pool_size = 8 |
|
51 |
|
|
50 |
_pools_mutex = Lock() |
|
52 | 51 |
|
52 |
default_pool_size = 100 |
|
53 | 53 |
USAGE_LIMIT = 1000 |
54 | 54 |
|
55 | 55 |
|
56 | 56 |
def init_http_pooling(size): |
57 |
global pool_size |
|
58 |
pool_size = size |
|
59 |
|
|
60 |
|
|
61 |
def put_http_connection(conn): |
|
62 |
pool = conn._pool |
|
63 |
log.debug("HTTP-PUT-BEFORE: putting connection %r back to pool %r", |
|
64 |
conn, pool) |
|
65 |
if pool is None: |
|
66 |
log.debug("HTTP-PUT: connection %r does not have a pool", conn) |
|
67 |
return |
|
68 |
conn._pool = None |
|
69 |
pool.pool_put(conn) |
|
57 |
global default_pool_size |
|
58 |
default_pool_size = size |
|
70 | 59 |
|
71 | 60 |
|
72 | 61 |
class HTTPConnectionPool(ObjectPool): |
... | ... | |
93 | 82 |
def _pool_create(self): |
94 | 83 |
log.debug("CREATE-HTTP-BEFORE from pool %r", self) |
95 | 84 |
conn = self.connection_class(self.netloc) |
96 |
conn._use_counter = USAGE_LIMIT |
|
97 |
conn._pool = self |
|
98 |
conn._real_close = conn.close |
|
99 |
conn.close = instancemethod(put_http_connection, conn, type(conn)) |
|
85 |
conn._pool_use_counter = USAGE_LIMIT |
|
100 | 86 |
return conn |
101 | 87 |
|
102 | 88 |
def _pool_verify(self, conn): |
103 | 89 |
log.debug("VERIFY-HTTP") |
104 |
# _pool verify is called at every pool_get(). |
|
105 |
# Make sure this connection obj is associated with the proper pool. |
|
106 |
# The association is broken by put_http_connection(), to prevent |
|
107 |
# a connection object from being returned to the pool twice, |
|
108 |
# on duplicate invocations of conn.close(). |
|
109 | 90 |
if conn is None: |
110 | 91 |
return False |
111 |
if not conn._pool: |
|
112 |
conn._pool = self |
|
113 | 92 |
sock = conn.sock |
114 | 93 |
if sock is None: |
115 | 94 |
return True |
... | ... | |
120 | 99 |
def _pool_cleanup(self, conn): |
121 | 100 |
log.debug("CLEANUP-HTTP") |
122 | 101 |
# every connection can be used a finite number of times |
123 |
conn._use_counter -= 1 |
|
102 |
conn._pool_use_counter -= 1
|
|
124 | 103 |
|
125 | 104 |
# see httplib source for connection states documentation |
126 |
if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle': |
|
105 |
if (conn._pool_use_counter > 0 and |
|
106 |
conn._HTTPConnection__state == 'Idle'): |
|
127 | 107 |
try: |
128 | 108 |
conn.getresponse() |
129 | 109 |
except ResponseNotReady: |
... | ... | |
131 | 111 |
return False |
132 | 112 |
|
133 | 113 |
log.debug("CLEANUP-HTTP: Closing connection. Will not reuse.") |
134 |
conn._real_close()
|
|
114 |
conn.close() |
|
135 | 115 |
return True |
136 | 116 |
|
137 | 117 |
|
138 |
def get_http_connection(netloc=None, scheme='http', pool_size=pool_size): |
|
139 |
log.debug("HTTP-GET: Getting HTTP connection") |
|
140 |
if netloc is None: |
|
141 |
m = "netloc cannot be None" |
|
142 |
raise ValueError(m) |
|
143 |
# does the pool need to be created? |
|
144 |
# ensure distinct pools are created for every (scheme, netloc) combination |
|
145 |
key = (scheme, netloc) |
|
146 |
if key not in _pools: |
|
147 |
log.debug("HTTP-GET: Creating pool for key %s", key) |
|
148 |
pool = HTTPConnectionPool(scheme, netloc, size=pool_size) |
|
149 |
_pools[key] = pool |
|
150 |
|
|
151 |
obj = _pools[key].pool_get() |
|
152 |
log.debug("HTTP-GET: Returning object %r", obj) |
|
153 |
return obj |
|
118 |
class PooledHTTPConnection(PooledObject): |
|
119 |
|
|
120 |
_pool_log_prefix = "HTTP" |
|
121 |
_pool_class = HTTPConnectionPool |
|
122 |
|
|
123 |
def __init__(self, netloc, scheme='http', pool=None, **kw): |
|
124 |
kw['netloc'] = netloc |
|
125 |
kw['scheme'] = scheme |
|
126 |
kw['pool'] = pool |
|
127 |
super(PooledHTTPConnection, self).__init__(**kw) |
|
128 |
|
|
129 |
def get_pool(self): |
|
130 |
kwargs = self._pool_kwargs |
|
131 |
pool = kwargs.pop('pool', None) |
|
132 |
if pool is not None: |
|
133 |
return pool |
|
134 |
|
|
135 |
# pool was not given, find one from the global registry |
|
136 |
scheme = kwargs['scheme'] |
|
137 |
netloc = kwargs['netloc'] |
|
138 |
size = kwargs.get('size', default_pool_size) |
|
139 |
# ensure distinct pools for every (scheme, netloc) combination |
|
140 |
key = (scheme, netloc) |
|
141 |
with _pools_mutex: |
|
142 |
if key not in _pools: |
|
143 |
log.debug("HTTP-GET: Creating pool for key %s", key) |
|
144 |
pool = HTTPConnectionPool(scheme, netloc, size=size) |
|
145 |
_pools[key] = pool |
|
146 |
else: |
|
147 |
pool = _pools[key] |
|
148 |
|
|
149 |
return pool |
b/snf-common/synnefo/lib/pool/tests.py | ||
---|---|---|
57 | 57 |
import threading |
58 | 58 |
from collections import defaultdict |
59 | 59 |
|
60 |
from socket import socket, AF_INET, SOCK_STREAM, IPPROTO_TCP, SHUT_RDWR |
|
61 |
|
|
60 | 62 |
from synnefo.lib.pool import ObjectPool, PoolLimitError, PoolVerificationError |
61 |
from synnefo.lib.pool.http import get_http_connection
|
|
63 |
from synnefo.lib.pool.http import PooledHTTPConnection, HTTPConnectionPool
|
|
62 | 64 |
from synnefo.lib.pool.http import _pools as _http_pools |
63 | 65 |
|
64 | 66 |
# Use backported unittest functionality if Python < 2.7 |
... | ... | |
291 | 293 |
|
292 | 294 |
|
293 | 295 |
class TestHTTPConnectionTestCase(unittest.TestCase): |
294 |
def test_double_close(self): |
|
295 |
conn = get_http_connection("127.0.0.1", "http") |
|
296 |
self.assertEqual(conn._pool, _http_pools[("http", "127.0.0.1")]) |
|
297 |
conn.close() |
|
298 |
self.assertIsNone(conn._pool) |
|
299 |
# This call does nothing, because conn._pool is already None |
|
300 |
conn.close() |
|
301 |
self.assertIsNone(conn._pool) |
|
296 |
def setUp(self): |
|
297 |
#netloc = "127.0.0.1:9999" |
|
298 |
#scheme='http' |
|
299 |
#self.pool = HTTPConnectionPool( |
|
300 |
# netloc=netloc, |
|
301 |
# scheme=scheme, |
|
302 |
# pool_size=1) |
|
303 |
#key = (scheme, netloc) |
|
304 |
#_http_pools[key] = pool |
|
305 |
|
|
306 |
_http_pools.clear() |
|
307 |
|
|
308 |
self.host = "127.0.0.1" |
|
309 |
self.port = 9999 |
|
310 |
self.netloc = "%s:%s" % (self.host, self.port) |
|
311 |
self.scheme = "http" |
|
312 |
self.key = (self.scheme, self.netloc) |
|
313 |
|
|
314 |
sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) |
|
315 |
sock.bind((self.host, self.port)) |
|
316 |
sock.listen(1) |
|
317 |
self.sock = sock |
|
318 |
|
|
319 |
def tearDown(self): |
|
320 |
sock = self.sock |
|
321 |
sock.shutdown(SHUT_RDWR) |
|
322 |
sock.close() |
|
323 |
|
|
324 |
def test_double_release(self): |
|
325 |
pooled = PooledHTTPConnection(self.netloc, self.scheme) |
|
326 |
pooled.acquire() |
|
327 |
pool = pooled._pool |
|
328 |
self.assertTrue(pooled._pool is _http_pools[(self.scheme, self.netloc)]) |
|
329 |
pooled.release() |
|
330 |
|
|
331 |
poolsize = len(pool._set) |
|
332 |
|
|
333 |
if PooledHTTPConnection._pool_disable_after_release: |
|
334 |
self.assertTrue(pooled._pool is False) |
|
335 |
|
|
336 |
if not PooledHTTPConnection._pool_ignore_double_release: |
|
337 |
with self.assertRaises(AssertionError): |
|
338 |
pooled.release() |
|
339 |
else: |
|
340 |
pooled.release() |
|
341 |
|
|
342 |
self.assertEqual(poolsize, len(pool._set)) |
|
302 | 343 |
|
303 | 344 |
def test_distinct_pools_per_scheme(self): |
304 |
conn = get_http_connection("127.0.0.1", "http") |
|
305 |
pool = conn._pool |
|
306 |
self.assertTrue(pool is _http_pools[("http", "127.0.0.1")]) |
|
307 |
conn.close() |
|
308 |
conn2 = get_http_connection("127.0.0.1", "https") |
|
309 |
self.assertTrue(conn is not conn2) |
|
310 |
self.assertNotEqual(pool, conn2._pool) |
|
311 |
self.assertTrue(conn2._pool is _http_pools[("https", "127.0.0.1")]) |
|
312 |
conn2.close() |
|
345 |
with PooledHTTPConnection("127.0.0.1", "http", |
|
346 |
attach_context=True) as conn: |
|
347 |
pool = conn._pool_context._pool |
|
348 |
self.assertTrue(pool is _http_pools[("http", "127.0.0.1")]) |
|
349 |
|
|
350 |
with PooledHTTPConnection("127.0.0.1", "https", |
|
351 |
attach_context=True) as conn2: |
|
352 |
pool2 = conn2._pool_context._pool |
|
353 |
self.assertTrue(conn is not conn2) |
|
354 |
self.assertNotEqual(pool, pool2) |
|
355 |
self.assertTrue(pool2 is _http_pools[("https", "127.0.0.1")]) |
|
356 |
|
|
357 |
def test_clean_connection(self): |
|
358 |
pool = None |
|
359 |
pooled = PooledHTTPConnection(self.netloc, self.scheme) |
|
360 |
conn = pooled.acquire() |
|
361 |
pool = pooled._pool |
|
362 |
self.assertTrue(pool is not None) |
|
363 |
pooled.release() |
|
364 |
self.assertTrue(pooled._pool is False) |
|
365 |
poolset = pool._set |
|
366 |
self.assertEqual(len(poolset), 1) |
|
367 |
pooled_conn = list(poolset)[0] |
|
368 |
self.assertTrue(pooled_conn is conn) |
|
369 |
|
|
370 |
def test_dirty_connection(self): |
|
371 |
pooled = PooledHTTPConnection(self.netloc, self.scheme) |
|
372 |
conn = pooled.acquire() |
|
373 |
pool = pooled._pool |
|
374 |
conn.request("GET", "/") |
|
375 |
serversock, addr = self.sock.accept() |
|
376 |
serversock.send("HTTP/1.1 200 OK\n" |
|
377 |
"Content-Length: 6\n" |
|
378 |
"\n" |
|
379 |
"HELLO\n") |
|
380 |
time.sleep(0.3) |
|
381 |
# We would read this message like this |
|
382 |
#resp = conn.getresponse() |
|
383 |
# but we won't so the connection is dirty |
|
384 |
pooled.release() |
|
385 |
|
|
386 |
poolset = pool._set |
|
387 |
self.assertEqual(len(poolset), 0) |
|
388 |
|
|
389 |
def test_context_manager_exception_safety(self): |
|
390 |
class TestError(Exception): |
|
391 |
pass |
|
392 |
|
|
393 |
for i in xrange(10): |
|
394 |
pool = None |
|
395 |
try: |
|
396 |
with PooledHTTPConnection( |
|
397 |
self.netloc, self.scheme, |
|
398 |
size=1, attach_context=True) as conn: |
|
399 |
pool = conn._pool_context._pool |
|
400 |
raise TestError() |
|
401 |
except TestError: |
|
402 |
self.assertTrue(pool is not None) |
|
403 |
self.assertEqual(pool._semaphore._Semaphore__value, 1) |
|
404 |
|
|
313 | 405 |
|
314 | 406 |
if __name__ == '__main__': |
315 | 407 |
unittest.main() |
b/snf-common/synnefo/lib/quotaholder/http.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
from synnefo.lib.commissioning import Callpoint, CallError |
35 |
from synnefo.lib.pool.http import get_http_connection
|
|
35 |
from synnefo.lib.pool.http import PooledHTTPConnection
|
|
36 | 36 |
from .api import QuotaholderAPI |
37 | 37 |
|
38 | 38 |
from json import loads as json_loads, dumps as json_dumps |
... | ... | |
70 | 70 |
|
71 | 71 |
logger.debug("%s %s\n%s\n<<<\n", method, path, json_data[:128]) |
72 | 72 |
headers = {'X-Auth-Token': self._token} |
73 |
conn = get_http_connection(scheme=self._scheme, netloc=self._netloc,
|
|
74 |
pool_size=self._poolsize)
|
|
75 |
try:
|
|
73 |
with PooledHTTPConnection(scheme=self._scheme,
|
|
74 |
netloc=self._netloc,
|
|
75 |
size=self._poolsize) as conn:
|
|
76 | 76 |
conn.request(method, path, body=json_data, headers=headers) |
77 | 77 |
resp = conn.getresponse() |
78 |
finally: |
|
79 |
conn.close() |
|
78 |
body = resp.read() |
|
80 | 79 |
|
81 | 80 |
logger.debug(">>>\nStatus: %s", resp.status) |
82 |
|
|
83 |
body = resp.read() |
|
84 | 81 |
logger.debug("\n%s\n<<<\n", body[:128] if body else None) |
85 | 82 |
|
86 | 83 |
status = int(resp.status) |
b/snf-cyclades-app/synnefo/api/delegate.py | ||
---|---|---|
42 | 42 |
USER_CATALOG_URL = getattr(settings, 'CYCLADES_USER_CATALOG_URL', None) |
43 | 43 |
USER_FEEDBACK_URL = getattr(settings, 'CYCLADES_USER_FEEDBACK_URL', None) |
44 | 44 |
|
45 |
from synnefo.lib.pool.http import get_http_connection
|
|
45 |
from synnefo.lib.pool.http import PooledHTTPConnection
|
|
46 | 46 |
|
47 | 47 |
logger = logging.getLogger(__name__) |
48 | 48 |
|
... | ... | |
57 | 57 |
kwargs['headers'].setdefault('content-type', 'application/json') |
58 | 58 |
kwargs['headers'].setdefault('content-length', len(body) if body else 0) |
59 | 59 |
|
60 |
conn = get_http_connection(p.netloc, p.scheme) |
|
61 |
try: |
|
60 |
with PooledHTTPConnection(p.netloc, p.scheme) as conn: |
|
62 | 61 |
conn.request(request.method, p.path + '?' + p.query, **kwargs) |
63 | 62 |
response = conn.getresponse() |
64 | 63 |
length = response.getheader('content-length', None) |
65 | 64 |
data = response.read(length) |
66 | 65 |
status = int(response.status) |
67 | 66 |
return HttpResponse(data, status=status) |
68 |
finally: |
|
69 |
conn.close() |
|
70 | 67 |
|
71 | 68 |
|
72 | 69 |
@csrf_exempt |
b/snf-pithos-app/pithos/api/delegate.py | ||
---|---|---|
46 | 46 |
from pithos.api.settings import ( |
47 | 47 |
AUTHENTICATION_USERS, USER_LOGIN_URL, USER_FEEDBACK_URL, USER_CATALOG_URL) |
48 | 48 |
|
49 |
from synnefo.lib.pool.http import get_http_connection
|
|
49 |
from synnefo.lib.pool.http import PooledHTTPConnection
|
|
50 | 50 |
|
51 | 51 |
logger = logging.getLogger(__name__) |
52 | 52 |
|
... | ... | |
79 | 79 |
kwargs['headers'].setdefault('content-type', 'application/json') |
80 | 80 |
kwargs['headers'].setdefault('content-length', len(body) if body else 0) |
81 | 81 |
|
82 |
conn = get_http_connection(p.netloc, p.scheme) |
|
83 |
try: |
|
82 |
with PooledHTTPConnection(p.netloc, p.scheme) as conn: |
|
84 | 83 |
conn.request(request.method, p.path + '?' + p.query, **kwargs) |
85 | 84 |
response = conn.getresponse() |
86 | 85 |
length = response.getheader('content-length', None) |
87 | 86 |
data = response.read(length) |
88 | 87 |
status = int(response.status) |
89 | 88 |
return HttpResponse(data, status=status) |
90 |
finally: |
|
91 |
conn.close() |
|
92 | 89 |
|
93 | 90 |
@csrf_exempt |
94 | 91 |
def delegate_to_feedback_service(request): |
Also available in: Unified diff