Revision ab133e68
b/snf-common/synnefo/lib/db/pooled_psycopg2/__init__.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
# |
|
34 |
|
|
35 |
import psycopg2 |
|
36 |
from synnefo.lib.pool import ObjectPool |
|
37 |
|
|
38 |
import logging |
|
39 |
logger = logging.getLogger(__name__) |
|
40 |
|
|
41 |
|
|
42 |
# Quick-n-dirty logging setup to stderr |
|
43 |
|
|
44 |
#LOG_FORMAT = "%(asctime)-15s %(levelname)-6s %(message)s" |
|
45 |
#handler = logging.StreamHandler() |
|
46 |
#handler = logging.FileHandler("/tmp/aa1") |
|
47 |
#handler.setFormatter(logging.Formatter(LOG_FORMAT)) |
|
48 |
#logger.addHandler(handler) |
|
49 |
#logger.setLevel(logging.DEBUG) |
|
50 |
#logger.fatal("kalhmera1") |
|
51 |
#import sys |
|
52 |
#print >>sys.stderr, "logger is %d" % id(logger) |
|
53 |
|
|
54 |
_pool_kwargs = None |
|
55 |
_pool = None |
|
56 |
|
|
57 |
|
|
58 |
# How many times to retry on getting a live connection |
|
59 |
# from the pool, before giving up. |
|
60 |
RETRY_LIMIT = 1000 |
|
61 |
|
|
62 |
|
|
63 |
class PooledConnection(object): |
|
64 |
"""Thin wrapper around a psycopg2 connection for a pooled connection. |
|
65 |
|
|
66 |
It takes care to put itself back into the pool upon connection closure. |
|
67 |
|
|
68 |
""" |
|
69 |
def __init__(self, pool, conn): |
|
70 |
#print >>sys.stderr,"LOGGER is %d" % id(logger) |
|
71 |
logger.fatal("INIT POOLED CONN: pool = %s, conn = %s", pool, conn) |
|
72 |
self._pool = pool |
|
73 |
self._conn = conn |
|
74 |
|
|
75 |
def close(self): |
|
76 |
logger.debug("CLOSE POOLED CONN: self._pool = %s", self._pool) |
|
77 |
if not self._pool: |
|
78 |
return |
|
79 |
|
|
80 |
pool = self._pool |
|
81 |
logger.debug("PUT POOLED CONN: About to return %s to the pool", self) |
|
82 |
pool.pool_put(self) |
|
83 |
logger.debug("FINISHED PUT POOLED CONN: Returned %s to the pool", self) |
|
84 |
|
|
85 |
def __getattr__(self, attr): |
|
86 |
"""Proxy every other call to the real connection""" |
|
87 |
return getattr(self._conn, attr) |
|
88 |
|
|
89 |
def __setattr__(self, attr, val): |
|
90 |
if attr not in ("_pool", "_conn"): |
|
91 |
setattr(self._conn, attr, val) |
|
92 |
object.__setattr__(self, attr, val) |
|
93 |
|
|
94 |
#def __del__(self): |
|
95 |
# pass |
|
96 |
# print >>sys.stderr, "DELETED" |
|
97 |
|
|
98 |
|
|
99 |
class Psycopg2ConnectionPool(ObjectPool): |
|
100 |
"""A synnefo.lib.pool.ObjectPool of psycopg2 connections. |
|
101 |
|
|
102 |
Every connection knows how to return itself to the pool |
|
103 |
when it gets close()d. |
|
104 |
|
|
105 |
""" |
|
106 |
|
|
107 |
def __init__(self, **kw): |
|
108 |
ObjectPool.__init__(self, size=kw["synnefo_poolsize"]) |
|
109 |
kw.pop("synnefo_poolsize") |
|
110 |
self._connection_args = kw |
|
111 |
|
|
112 |
def _pool_create(self): |
|
113 |
logger.info("CREATE: About to get a new connection from psycopg2") |
|
114 |
conn = psycopg2._original_connect(**self._connection_args) |
|
115 |
logger.info("CREATED: Got connection %s from psycopg2", conn) |
|
116 |
return PooledConnection(self, conn) |
|
117 |
|
|
118 |
def _pool_cleanup(self, pooledconn): |
|
119 |
logger.debug("CLEANING, conn = %d", id(pooledconn)) |
|
120 |
try: |
|
121 |
# Reset this connection before putting it back |
|
122 |
# into the pool |
|
123 |
cursor = pooledconn.cursor() |
|
124 |
cursor.execute("ABORT; RESET ALL") |
|
125 |
except psycopg2.Error: |
|
126 |
# Since we're not going to be putting the psycopg2 connection |
|
127 |
# back into the pool, close it uncoditionally. |
|
128 |
logger.error("DEAD connection, conn = %d, %s", |
|
129 |
id(pooledconn), pooledconn) |
|
130 |
try: |
|
131 |
pooledconn._conn.close() |
|
132 |
except: |
|
133 |
pass |
|
134 |
return True |
|
135 |
return False |
|
136 |
|
|
137 |
|
|
138 |
def _init_pool(kw): |
|
139 |
global _pool |
|
140 |
global _pool_kwargs |
|
141 |
|
|
142 |
_pool_kwargs = kw |
|
143 |
_pool = Psycopg2ConnectionPool(**kw) |
|
144 |
|
|
145 |
|
|
146 |
def _get_pool(kw): |
|
147 |
if not _pool: |
|
148 |
logger.debug("POOLINIT: Initializing DB connection pool") |
|
149 |
_init_pool(kw) |
|
150 |
|
|
151 |
if _pool_kwargs != kw: |
|
152 |
raise NotImplementedError(("Requested pooled psycopg2 connection with " |
|
153 |
"args %s != %s." % (kw, _pool_kwargs))) |
|
154 |
return _pool |
|
155 |
|
|
156 |
|
|
157 |
def _verify_connection(pooledconn): |
|
158 |
try: |
|
159 |
cursor = pooledconn.cursor() |
|
160 |
cursor.execute("SELECT 1") |
|
161 |
except psycopg2.Error: |
|
162 |
# The connection has died. |
|
163 |
pooledconn.close() |
|
164 |
return False |
|
165 |
return True |
|
166 |
|
|
167 |
|
|
168 |
def _pooled_connect(**kw): |
|
169 |
poolsize = kw.get("synnefo_poolsize", 0) |
|
170 |
if not poolsize: |
|
171 |
kw.pop("synnefo_poolsize", None) |
|
172 |
return psycopg2._original_connect(**kw) |
|
173 |
|
|
174 |
pool = _get_pool(kw) |
|
175 |
logger.debug("GET: Pool: set: %d, semaphore: %d", |
|
176 |
len(pool._set), pool._semaphore._Semaphore__value) |
|
177 |
r = None |
|
178 |
n = 0 |
|
179 |
while not r: |
|
180 |
r = pool.pool_get() |
|
181 |
if not _verify_connection(r): |
|
182 |
logger.error("DEADCONNECTION: Got dead connection %d from pool", |
|
183 |
id(r)) |
|
184 |
r = None |
|
185 |
n += 1 |
|
186 |
if n > RETRY_LIMIT: |
|
187 |
raise RuntimeError(("Could not get live connection from pool" |
|
188 |
"after %d retries." % RETRY_LIMIT)) |
|
189 |
logger.debug("GOT: Got connection %d from pool", id(r)) |
|
190 |
return r |
|
191 |
|
|
192 |
|
|
193 |
def monkey_patch_psycopg2(): |
|
194 |
"""Monkey-patch psycopg2's connect(), to retrieve connections from a pool. |
|
195 |
|
|
196 |
To enable pooling, you need to pass a synnefo_poolsize argument |
|
197 |
inside psycopg2's connection options. |
|
198 |
|
|
199 |
""" |
|
200 |
|
|
201 |
if hasattr(psycopg2, '_original_connect'): |
|
202 |
return |
|
203 |
|
|
204 |
psycopg2._original_connect = psycopg2.connect |
|
205 |
psycopg2.connect = _pooled_connect |
b/snf-common/synnefo/lib/db/psyco_gevent.py | ||
---|---|---|
1 |
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com> |
|
2 |
# and licensed under the MIT license: |
|
3 |
# |
|
4 |
# Permission is hereby granted, free of charge, to any person obtaining a copy |
|
5 |
# of this software and associated documentation files (the "Software"), to deal |
|
6 |
# in the Software without restriction, including without limitation the rights |
|
7 |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
8 |
# copies of the Software, and to permit persons to whom the Software is |
|
9 |
# furnished to do so, subject to the following conditions: |
|
10 |
# |
|
11 |
# The above copyright notice and this permission notice shall be included in |
|
12 |
# all copies or substantial portions of the Software. |
|
13 |
# |
|
14 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
15 |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
16 |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
17 |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
18 |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
19 |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|
20 |
# THE SOFTWARE. |
|
21 |
"""A wait callback to allow psycopg2 cooperation with gevent. |
|
22 |
|
|
23 |
Use `make_psycopg_green()` to enable gevent support in Psycopg. |
|
24 |
|
|
25 |
""" |
|
26 |
|
|
27 |
import psycopg2 |
|
28 |
from psycopg2 import extensions |
|
29 |
|
|
30 |
from gevent.socket import wait_read, wait_write |
|
31 |
|
|
32 |
|
|
33 |
def make_psycopg_green(): |
|
34 |
"""Configure Psycopg to be used with gevent in non-blocking way.""" |
|
35 |
if not hasattr(extensions, 'set_wait_callback'): |
|
36 |
raise ImportError( |
|
37 |
"support for coroutines not available in this Psycopg version (%s)" |
|
38 |
% psycopg2.__version__) |
|
39 |
|
|
40 |
extensions.set_wait_callback(gevent_wait_callback) |
|
41 |
|
|
42 |
|
|
43 |
def gevent_wait_callback(conn, timeout=None): |
|
44 |
"""A wait callback useful to allow gevent to work with Psycopg.""" |
|
45 |
while 1: |
|
46 |
state = conn.poll() |
|
47 |
if state == extensions.POLL_OK: |
|
48 |
break |
|
49 |
elif state == extensions.POLL_READ: |
|
50 |
wait_read(conn.fileno(), timeout=timeout) |
|
51 |
elif state == extensions.POLL_WRITE: |
|
52 |
wait_write(conn.fileno(), timeout=timeout) |
|
53 |
else: |
|
54 |
raise psycopg2.OperationalError("Bad result from poll: %r" % state) |
/dev/null | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
# |
|
34 |
|
|
35 |
import psycopg2 |
|
36 |
from synnefo.lib.pool import ObjectPool |
|
37 |
|
|
38 |
import logging |
|
39 |
logger = logging.getLogger(__name__) |
|
40 |
|
|
41 |
|
|
42 |
# Quick-n-dirty logging setup to stderr |
|
43 |
|
|
44 |
#LOG_FORMAT = "%(asctime)-15s %(levelname)-6s %(message)s" |
|
45 |
#handler = logging.StreamHandler() |
|
46 |
#handler = logging.FileHandler("/tmp/aa1") |
|
47 |
#handler.setFormatter(logging.Formatter(LOG_FORMAT)) |
|
48 |
#logger.addHandler(handler) |
|
49 |
#logger.setLevel(logging.DEBUG) |
|
50 |
#logger.fatal("kalhmera1") |
|
51 |
#import sys |
|
52 |
#print >>sys.stderr, "logger is %d" % id(logger) |
|
53 |
|
|
54 |
_pool_kwargs = None |
|
55 |
_pool = None |
|
56 |
|
|
57 |
|
|
58 |
# How many times to retry on getting a live connection |
|
59 |
# from the pool, before giving up. |
|
60 |
RETRY_LIMIT = 1000 |
|
61 |
|
|
62 |
|
|
63 |
class PooledConnection(object): |
|
64 |
"""Thin wrapper around a psycopg2 connection for a pooled connection. |
|
65 |
|
|
66 |
It takes care to put itself back into the pool upon connection closure. |
|
67 |
|
|
68 |
""" |
|
69 |
def __init__(self, pool, conn): |
|
70 |
#print >>sys.stderr,"LOGGER is %d" % id(logger) |
|
71 |
logger.fatal("INIT POOLED CONN: pool = %s, conn = %s", pool, conn) |
|
72 |
self._pool = pool |
|
73 |
self._conn = conn |
|
74 |
|
|
75 |
def close(self): |
|
76 |
logger.debug("CLOSE POOLED CONN: self._pool = %s", self._pool) |
|
77 |
if not self._pool: |
|
78 |
return |
|
79 |
|
|
80 |
pool = self._pool |
|
81 |
logger.debug("PUT POOLED CONN: About to return %s to the pool", self) |
|
82 |
pool.pool_put(self) |
|
83 |
logger.debug("FINISHED PUT POOLED CONN: Returned %s to the pool", self) |
|
84 |
|
|
85 |
def __getattr__(self, attr): |
|
86 |
"""Proxy every other call to the real connection""" |
|
87 |
return getattr(self._conn, attr) |
|
88 |
|
|
89 |
def __setattr__(self, attr, val): |
|
90 |
if attr not in ("_pool", "_conn"): |
|
91 |
setattr(self._conn, attr, val) |
|
92 |
object.__setattr__(self, attr, val) |
|
93 |
|
|
94 |
#def __del__(self): |
|
95 |
# pass |
|
96 |
# print >>sys.stderr, "DELETED" |
|
97 |
|
|
98 |
|
|
99 |
class Psycopg2ConnectionPool(ObjectPool): |
|
100 |
"""A synnefo.lib.pool.ObjectPool of psycopg2 connections. |
|
101 |
|
|
102 |
Every connection knows how to return itself to the pool |
|
103 |
when it gets close()d. |
|
104 |
|
|
105 |
""" |
|
106 |
|
|
107 |
def __init__(self, **kw): |
|
108 |
ObjectPool.__init__(self, size=kw["synnefo_poolsize"]) |
|
109 |
kw.pop("synnefo_poolsize") |
|
110 |
self._connection_args = kw |
|
111 |
|
|
112 |
def _pool_create(self): |
|
113 |
logger.info("CREATE: About to get a new connection from psycopg2") |
|
114 |
conn = psycopg2._original_connect(**self._connection_args) |
|
115 |
logger.info("CREATED: Got connection %s from psycopg2", conn) |
|
116 |
return PooledConnection(self, conn) |
|
117 |
|
|
118 |
def _pool_cleanup(self, pooledconn): |
|
119 |
logger.debug("CLEANING, conn = %d", id(pooledconn)) |
|
120 |
try: |
|
121 |
# Reset this connection before putting it back |
|
122 |
# into the pool |
|
123 |
cursor = pooledconn.cursor() |
|
124 |
cursor.execute("ABORT; RESET ALL") |
|
125 |
except psycopg2.Error: |
|
126 |
# Since we're not going to be putting the psycopg2 connection |
|
127 |
# back into the pool, close it uncoditionally. |
|
128 |
logger.error("DEAD connection, conn = %d, %s", |
|
129 |
id(pooledconn), pooledconn) |
|
130 |
try: |
|
131 |
pooledconn._conn.close() |
|
132 |
except: |
|
133 |
pass |
|
134 |
return True |
|
135 |
return False |
|
136 |
|
|
137 |
|
|
138 |
def _init_pool(kw): |
|
139 |
global _pool |
|
140 |
global _pool_kwargs |
|
141 |
|
|
142 |
_pool_kwargs = kw |
|
143 |
_pool = Psycopg2ConnectionPool(**kw) |
|
144 |
|
|
145 |
|
|
146 |
def _get_pool(kw): |
|
147 |
if not _pool: |
|
148 |
logger.debug("POOLINIT: Initializing DB connection pool") |
|
149 |
_init_pool(kw) |
|
150 |
|
|
151 |
if _pool_kwargs != kw: |
|
152 |
raise NotImplementedError(("Requested pooled psycopg2 connection with " |
|
153 |
"args %s != %s." % (kw, _pool_kwargs))) |
|
154 |
return _pool |
|
155 |
|
|
156 |
|
|
157 |
def _verify_connection(pooledconn): |
|
158 |
try: |
|
159 |
cursor = pooledconn.cursor() |
|
160 |
cursor.execute("SELECT 1") |
|
161 |
except psycopg2.Error: |
|
162 |
# The connection has died. |
|
163 |
pooledconn.close() |
|
164 |
return False |
|
165 |
return True |
|
166 |
|
|
167 |
|
|
168 |
def _pooled_connect(**kw): |
|
169 |
poolsize = kw.get("synnefo_poolsize", 0) |
|
170 |
if not poolsize: |
|
171 |
kw.pop("synnefo_poolsize", None) |
|
172 |
return psycopg2._original_connect(**kw) |
|
173 |
|
|
174 |
pool = _get_pool(kw) |
|
175 |
logger.debug("GET: Pool: set: %d, semaphore: %d", |
|
176 |
len(pool._set), pool._semaphore._Semaphore__value) |
|
177 |
r = None |
|
178 |
n = 0 |
|
179 |
while not r: |
|
180 |
r = pool.pool_get() |
|
181 |
if not _verify_connection(r): |
|
182 |
logger.error("DEADCONNECTION: Got dead connection %d from pool", |
|
183 |
id(r)) |
|
184 |
r = None |
|
185 |
n += 1 |
|
186 |
if n > RETRY_LIMIT: |
|
187 |
raise RuntimeError(("Could not get live connection from pool" |
|
188 |
"after %d retries." % RETRY_LIMIT)) |
|
189 |
logger.debug("GOT: Got connection %d from pool", id(r)) |
|
190 |
return r |
|
191 |
|
|
192 |
|
|
193 |
def monkey_patch_psycopg2(): |
|
194 |
"""Monkey-patch psycopg2's connect(), to retrieve connections from a pool. |
|
195 |
|
|
196 |
To enable pooling, you need to pass a synnefo_poolsize argument |
|
197 |
inside psycopg2's connection options. |
|
198 |
|
|
199 |
""" |
|
200 |
|
|
201 |
if hasattr(psycopg2, '_original_connect'): |
|
202 |
return |
|
203 |
|
|
204 |
psycopg2._original_connect = psycopg2.connect |
|
205 |
psycopg2.connect = _pooled_connect |
/dev/null | ||
---|---|---|
1 |
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com> |
|
2 |
# and licensed under the MIT license: |
|
3 |
# |
|
4 |
# Permission is hereby granted, free of charge, to any person obtaining a copy |
|
5 |
# of this software and associated documentation files (the "Software"), to deal |
|
6 |
# in the Software without restriction, including without limitation the rights |
|
7 |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
8 |
# copies of the Software, and to permit persons to whom the Software is |
|
9 |
# furnished to do so, subject to the following conditions: |
|
10 |
# |
|
11 |
# The above copyright notice and this permission notice shall be included in |
|
12 |
# all copies or substantial portions of the Software. |
|
13 |
# |
|
14 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
15 |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
16 |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
17 |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
18 |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
19 |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|
20 |
# THE SOFTWARE. |
|
21 |
"""A wait callback to allow psycopg2 cooperation with gevent. |
|
22 |
|
|
23 |
Use `make_psycopg_green()` to enable gevent support in Psycopg. |
|
24 |
|
|
25 |
""" |
|
26 |
|
|
27 |
import psycopg2 |
|
28 |
from psycopg2 import extensions |
|
29 |
|
|
30 |
from gevent.socket import wait_read, wait_write |
|
31 |
|
|
32 |
|
|
33 |
def make_psycopg_green(): |
|
34 |
"""Configure Psycopg to be used with gevent in non-blocking way.""" |
|
35 |
if not hasattr(extensions, 'set_wait_callback'): |
|
36 |
raise ImportError( |
|
37 |
"support for coroutines not available in this Psycopg version (%s)" |
|
38 |
% psycopg2.__version__) |
|
39 |
|
|
40 |
extensions.set_wait_callback(gevent_wait_callback) |
|
41 |
|
|
42 |
|
|
43 |
def gevent_wait_callback(conn, timeout=None): |
|
44 |
"""A wait callback useful to allow gevent to work with Psycopg.""" |
|
45 |
while 1: |
|
46 |
state = conn.poll() |
|
47 |
if state == extensions.POLL_OK: |
|
48 |
break |
|
49 |
elif state == extensions.POLL_READ: |
|
50 |
wait_read(conn.fileno(), timeout=timeout) |
|
51 |
elif state == extensions.POLL_WRITE: |
|
52 |
wait_write(conn.fileno(), timeout=timeout) |
|
53 |
else: |
|
54 |
raise psycopg2.OperationalError("Bad result from poll: %r" % state) |
Also available in: Unified diff