root / snf-common / synnefo / lib / db / pooled_psycopg2 / __init__.py @ 5f6ad491
History | View | Annotate | Download (5.6 kB)
1 | e83ed1fb | Vangelis Koukis | # Copyright 2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | e83ed1fb | Vangelis Koukis | #
|
3 | e83ed1fb | Vangelis Koukis | # Redistribution and use in source and binary forms, with or
|
4 | e83ed1fb | Vangelis Koukis | # without modification, are permitted provided that the following
|
5 | e83ed1fb | Vangelis Koukis | # conditions are met:
|
6 | e83ed1fb | Vangelis Koukis | #
|
7 | e83ed1fb | Vangelis Koukis | # 1. Redistributions of source code must retain the above
|
8 | e83ed1fb | Vangelis Koukis | # copyright notice, this list of conditions and the following
|
9 | e83ed1fb | Vangelis Koukis | # disclaimer.
|
10 | e83ed1fb | Vangelis Koukis | #
|
11 | e83ed1fb | Vangelis Koukis | # 2. Redistributions in binary form must reproduce the above
|
12 | e83ed1fb | Vangelis Koukis | # copyright notice, this list of conditions and the following
|
13 | e83ed1fb | Vangelis Koukis | # disclaimer in the documentation and/or other materials
|
14 | e83ed1fb | Vangelis Koukis | # provided with the distribution.
|
15 | e83ed1fb | Vangelis Koukis | #
|
16 | e83ed1fb | Vangelis Koukis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | e83ed1fb | Vangelis Koukis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | e83ed1fb | Vangelis Koukis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | e83ed1fb | Vangelis Koukis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | e83ed1fb | Vangelis Koukis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | e83ed1fb | Vangelis Koukis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | e83ed1fb | Vangelis Koukis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | e83ed1fb | Vangelis Koukis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | e83ed1fb | Vangelis Koukis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | e83ed1fb | Vangelis Koukis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | e83ed1fb | Vangelis Koukis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | e83ed1fb | Vangelis Koukis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | e83ed1fb | Vangelis Koukis | #
|
29 | e83ed1fb | Vangelis Koukis | # The views and conclusions contained in the software and
|
30 | e83ed1fb | Vangelis Koukis | # documentation are those of the authors and should not be
|
31 | e83ed1fb | Vangelis Koukis | # interpreted as representing official policies, either expressed
|
32 | e83ed1fb | Vangelis Koukis | # or implied, of GRNET S.A.
|
33 | e83ed1fb | Vangelis Koukis | #
|
34 | e83ed1fb | Vangelis Koukis | |
35 | e83ed1fb | Vangelis Koukis | import psycopg2 |
36 | e83ed1fb | Vangelis Koukis | from synnefo.lib.pool import ObjectPool |
37 | e83ed1fb | Vangelis Koukis | |
38 | d8fe0948 | Georgios D. Tsoukalas | from select import select |
39 | e83ed1fb | Vangelis Koukis | import logging |
40 | 68453d22 | Christos Stavrakakis | log = logging.getLogger(__name__) |
41 | e83ed1fb | Vangelis Koukis | |
42 | e83ed1fb | Vangelis Koukis | |
43 | e83ed1fb | Vangelis Koukis | _pool_kwargs = None
|
44 | e83ed1fb | Vangelis Koukis | _pool = None
|
45 | e83ed1fb | Vangelis Koukis | |
46 | e83ed1fb | Vangelis Koukis | |
47 | e83ed1fb | Vangelis Koukis | # How many times to retry on getting a live connection
|
48 | e83ed1fb | Vangelis Koukis | # from the pool, before giving up.
|
49 | e83ed1fb | Vangelis Koukis | RETRY_LIMIT = 1000
|
50 | e83ed1fb | Vangelis Koukis | |
51 | e83ed1fb | Vangelis Koukis | |
52 | e83ed1fb | Vangelis Koukis | class PooledConnection(object): |
53 | e83ed1fb | Vangelis Koukis | """Thin wrapper around a psycopg2 connection for a pooled connection.
|
54 | e83ed1fb | Vangelis Koukis |
|
55 | e83ed1fb | Vangelis Koukis | It takes care to put itself back into the pool upon connection closure.
|
56 | e83ed1fb | Vangelis Koukis |
|
57 | e83ed1fb | Vangelis Koukis | """
|
58 | e83ed1fb | Vangelis Koukis | def __init__(self, pool, conn): |
59 | 68453d22 | Christos Stavrakakis | log.debug("INIT-POOLED: pool = %s, conn = %s", pool, conn)
|
60 | e83ed1fb | Vangelis Koukis | self._pool = pool
|
61 | e83ed1fb | Vangelis Koukis | self._conn = conn
|
62 | e83ed1fb | Vangelis Koukis | |
63 | e83ed1fb | Vangelis Koukis | def close(self): |
64 | 68453d22 | Christos Stavrakakis | log.debug("CLOSE-POOLED: self._pool = %s", self._pool) |
65 | e83ed1fb | Vangelis Koukis | if not self._pool: |
66 | e83ed1fb | Vangelis Koukis | return
|
67 | e83ed1fb | Vangelis Koukis | |
68 | e83ed1fb | Vangelis Koukis | pool = self._pool
|
69 | 68453d22 | Christos Stavrakakis | log.debug("PUT-POOLED-BEFORE: about to return %s to the pool", self) |
70 | e83ed1fb | Vangelis Koukis | pool.pool_put(self)
|
71 | 68453d22 | Christos Stavrakakis | log.debug("PUT-POOLED-AFTER: returned %s to the pool", self) |
72 | e83ed1fb | Vangelis Koukis | |
73 | e83ed1fb | Vangelis Koukis | def __getattr__(self, attr): |
74 | e83ed1fb | Vangelis Koukis | """Proxy every other call to the real connection"""
|
75 | e83ed1fb | Vangelis Koukis | return getattr(self._conn, attr) |
76 | e83ed1fb | Vangelis Koukis | |
77 | e83ed1fb | Vangelis Koukis | def __setattr__(self, attr, val): |
78 | e83ed1fb | Vangelis Koukis | if attr not in ("_pool", "_conn"): |
79 | e83ed1fb | Vangelis Koukis | setattr(self._conn, attr, val) |
80 | e83ed1fb | Vangelis Koukis | object.__setattr__(self, attr, val) |
81 | e83ed1fb | Vangelis Koukis | |
82 | e83ed1fb | Vangelis Koukis | |
83 | e83ed1fb | Vangelis Koukis | class Psycopg2ConnectionPool(ObjectPool): |
84 | e83ed1fb | Vangelis Koukis | """A synnefo.lib.pool.ObjectPool of psycopg2 connections.
|
85 | e83ed1fb | Vangelis Koukis |
|
86 | e83ed1fb | Vangelis Koukis | Every connection knows how to return itself to the pool
|
87 | e83ed1fb | Vangelis Koukis | when it gets close()d.
|
88 | e83ed1fb | Vangelis Koukis |
|
89 | e83ed1fb | Vangelis Koukis | """
|
90 | e83ed1fb | Vangelis Koukis | |
91 | e83ed1fb | Vangelis Koukis | def __init__(self, **kw): |
92 | e83ed1fb | Vangelis Koukis | ObjectPool.__init__(self, size=kw["synnefo_poolsize"]) |
93 | e83ed1fb | Vangelis Koukis | kw.pop("synnefo_poolsize")
|
94 | e83ed1fb | Vangelis Koukis | self._connection_args = kw
|
95 | e83ed1fb | Vangelis Koukis | |
96 | e83ed1fb | Vangelis Koukis | def _pool_create(self): |
97 | 68453d22 | Christos Stavrakakis | log.info("CREATE: about to get a new connection from psycopg2")
|
98 | e83ed1fb | Vangelis Koukis | conn = psycopg2._original_connect(**self._connection_args)
|
99 | 68453d22 | Christos Stavrakakis | log.info("CREATED: got connection %s from psycopg2", conn)
|
100 | e83ed1fb | Vangelis Koukis | return PooledConnection(self, conn) |
101 | e83ed1fb | Vangelis Koukis | |
102 | d8fe0948 | Georgios D. Tsoukalas | def _pool_verify_execute(pooledconn): |
103 | d8fe0948 | Georgios D. Tsoukalas | try:
|
104 | d8fe0948 | Georgios D. Tsoukalas | cursor = pooledconn.cursor() |
105 | d8fe0948 | Georgios D. Tsoukalas | cursor.execute("SELECT 1")
|
106 | d8fe0948 | Georgios D. Tsoukalas | except psycopg2.Error:
|
107 | d8fe0948 | Georgios D. Tsoukalas | # The connection has died.
|
108 | d8fe0948 | Georgios D. Tsoukalas | pooledconn.close() |
109 | d8fe0948 | Georgios D. Tsoukalas | return False |
110 | d8fe0948 | Georgios D. Tsoukalas | return True |
111 | d8fe0948 | Georgios D. Tsoukalas | |
112 | d8fe0948 | Georgios D. Tsoukalas | def _pool_verify(self, conn): |
113 | d8fe0948 | Georgios D. Tsoukalas | if select((conn.fileno(),), (), (), 0)[0]: |
114 | d8fe0948 | Georgios D. Tsoukalas | return False |
115 | d8fe0948 | Georgios D. Tsoukalas | return True |
116 | d8fe0948 | Georgios D. Tsoukalas | |
117 | e83ed1fb | Vangelis Koukis | def _pool_cleanup(self, pooledconn): |
118 | 68453d22 | Christos Stavrakakis | log.debug("CLEANING, conn = %d", id(pooledconn)) |
119 | e83ed1fb | Vangelis Koukis | try:
|
120 | e83ed1fb | Vangelis Koukis | # Reset this connection before putting it back
|
121 | e83ed1fb | Vangelis Koukis | # into the pool
|
122 | 5f6ad491 | Christos Stavrakakis | pooledconn.rollback() |
123 | e83ed1fb | Vangelis Koukis | except psycopg2.Error:
|
124 | e83ed1fb | Vangelis Koukis | # Since we're not going to be putting the psycopg2 connection
|
125 | e83ed1fb | Vangelis Koukis | # back into the pool, close it uncoditionally.
|
126 | 68453d22 | Christos Stavrakakis | log.error("Detected dead connection, conn = %d, %s",
|
127 | 68453d22 | Christos Stavrakakis | id(pooledconn), pooledconn)
|
128 | e83ed1fb | Vangelis Koukis | try:
|
129 | e83ed1fb | Vangelis Koukis | pooledconn._conn.close() |
130 | e83ed1fb | Vangelis Koukis | except:
|
131 | e83ed1fb | Vangelis Koukis | pass
|
132 | e83ed1fb | Vangelis Koukis | return True |
133 | e83ed1fb | Vangelis Koukis | return False |
134 | e83ed1fb | Vangelis Koukis | |
135 | e83ed1fb | Vangelis Koukis | |
136 | e83ed1fb | Vangelis Koukis | def _init_pool(kw): |
137 | e83ed1fb | Vangelis Koukis | global _pool
|
138 | e83ed1fb | Vangelis Koukis | global _pool_kwargs
|
139 | e83ed1fb | Vangelis Koukis | |
140 | e83ed1fb | Vangelis Koukis | _pool_kwargs = kw |
141 | e83ed1fb | Vangelis Koukis | _pool = Psycopg2ConnectionPool(**kw) |
142 | e83ed1fb | Vangelis Koukis | |
143 | e83ed1fb | Vangelis Koukis | |
144 | e83ed1fb | Vangelis Koukis | def _get_pool(kw): |
145 | e83ed1fb | Vangelis Koukis | if not _pool: |
146 | 68453d22 | Christos Stavrakakis | log.debug("INIT-POOL: Initializing DB connection pool")
|
147 | e83ed1fb | Vangelis Koukis | _init_pool(kw) |
148 | e83ed1fb | Vangelis Koukis | |
149 | e83ed1fb | Vangelis Koukis | if _pool_kwargs != kw:
|
150 | e83ed1fb | Vangelis Koukis | raise NotImplementedError(("Requested pooled psycopg2 connection with " |
151 | e83ed1fb | Vangelis Koukis | "args %s != %s." % (kw, _pool_kwargs)))
|
152 | e83ed1fb | Vangelis Koukis | return _pool
|
153 | e83ed1fb | Vangelis Koukis | |
154 | e83ed1fb | Vangelis Koukis | |
155 | e83ed1fb | Vangelis Koukis | def _pooled_connect(**kw): |
156 | e83ed1fb | Vangelis Koukis | poolsize = kw.get("synnefo_poolsize", 0) |
157 | e83ed1fb | Vangelis Koukis | if not poolsize: |
158 | e83ed1fb | Vangelis Koukis | kw.pop("synnefo_poolsize", None) |
159 | e83ed1fb | Vangelis Koukis | return psycopg2._original_connect(**kw)
|
160 | e83ed1fb | Vangelis Koukis | |
161 | e83ed1fb | Vangelis Koukis | pool = _get_pool(kw) |
162 | 68453d22 | Christos Stavrakakis | log.debug("GET-POOL: Pool: %r", pool)
|
163 | d8fe0948 | Georgios D. Tsoukalas | r = pool.pool_get() |
164 | 68453d22 | Christos Stavrakakis | log.debug("GOT-POOL: Got connection %d from pool %r", id(r), pool) |
165 | e83ed1fb | Vangelis Koukis | return r
|
166 | e83ed1fb | Vangelis Koukis | |
167 | e83ed1fb | Vangelis Koukis | |
168 | e83ed1fb | Vangelis Koukis | def monkey_patch_psycopg2(): |
169 | e83ed1fb | Vangelis Koukis | """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
|
170 | e83ed1fb | Vangelis Koukis |
|
171 | e83ed1fb | Vangelis Koukis | To enable pooling, you need to pass a synnefo_poolsize argument
|
172 | e83ed1fb | Vangelis Koukis | inside psycopg2's connection options.
|
173 | e83ed1fb | Vangelis Koukis |
|
174 | e83ed1fb | Vangelis Koukis | """
|
175 | e83ed1fb | Vangelis Koukis | |
176 | e83ed1fb | Vangelis Koukis | if hasattr(psycopg2, '_original_connect'): |
177 | e83ed1fb | Vangelis Koukis | return
|
178 | e83ed1fb | Vangelis Koukis | |
179 | e83ed1fb | Vangelis Koukis | psycopg2._original_connect = psycopg2.connect |
180 | e83ed1fb | Vangelis Koukis | psycopg2.connect = _pooled_connect |