Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / db / pooled_psycopg2 / __init__.py @ 5f6ad491

History | View | Annotate | Download (5.6 kB)

1 e83ed1fb Vangelis Koukis
# Copyright 2012 GRNET S.A. All rights reserved.
2 e83ed1fb Vangelis Koukis
#
3 e83ed1fb Vangelis Koukis
# Redistribution and use in source and binary forms, with or
4 e83ed1fb Vangelis Koukis
# without modification, are permitted provided that the following
5 e83ed1fb Vangelis Koukis
# conditions are met:
6 e83ed1fb Vangelis Koukis
#
7 e83ed1fb Vangelis Koukis
#   1. Redistributions of source code must retain the above
8 e83ed1fb Vangelis Koukis
#      copyright notice, this list of conditions and the following
9 e83ed1fb Vangelis Koukis
#      disclaimer.
10 e83ed1fb Vangelis Koukis
#
11 e83ed1fb Vangelis Koukis
#   2. Redistributions in binary form must reproduce the above
12 e83ed1fb Vangelis Koukis
#      copyright notice, this list of conditions and the following
13 e83ed1fb Vangelis Koukis
#      disclaimer in the documentation and/or other materials
14 e83ed1fb Vangelis Koukis
#      provided with the distribution.
15 e83ed1fb Vangelis Koukis
#
16 e83ed1fb Vangelis Koukis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 e83ed1fb Vangelis Koukis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 e83ed1fb Vangelis Koukis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 e83ed1fb Vangelis Koukis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 e83ed1fb Vangelis Koukis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 e83ed1fb Vangelis Koukis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 e83ed1fb Vangelis Koukis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 e83ed1fb Vangelis Koukis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 e83ed1fb Vangelis Koukis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 e83ed1fb Vangelis Koukis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 e83ed1fb Vangelis Koukis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 e83ed1fb Vangelis Koukis
# POSSIBILITY OF SUCH DAMAGE.
28 e83ed1fb Vangelis Koukis
#
29 e83ed1fb Vangelis Koukis
# The views and conclusions contained in the software and
30 e83ed1fb Vangelis Koukis
# documentation are those of the authors and should not be
31 e83ed1fb Vangelis Koukis
# interpreted as representing official policies, either expressed
32 e83ed1fb Vangelis Koukis
# or implied, of GRNET S.A.
33 e83ed1fb Vangelis Koukis
#
34 e83ed1fb Vangelis Koukis
35 e83ed1fb Vangelis Koukis
import psycopg2
36 e83ed1fb Vangelis Koukis
from synnefo.lib.pool import ObjectPool
37 e83ed1fb Vangelis Koukis
38 d8fe0948 Georgios D. Tsoukalas
from select import select
39 e83ed1fb Vangelis Koukis
import logging
40 68453d22 Christos Stavrakakis
log = logging.getLogger(__name__)
41 e83ed1fb Vangelis Koukis
42 e83ed1fb Vangelis Koukis
43 e83ed1fb Vangelis Koukis
_pool_kwargs = None
44 e83ed1fb Vangelis Koukis
_pool = None
45 e83ed1fb Vangelis Koukis
46 e83ed1fb Vangelis Koukis
47 e83ed1fb Vangelis Koukis
# How many times to retry on getting a live connection
48 e83ed1fb Vangelis Koukis
# from the pool, before giving up.
49 e83ed1fb Vangelis Koukis
RETRY_LIMIT = 1000
50 e83ed1fb Vangelis Koukis
51 e83ed1fb Vangelis Koukis
52 e83ed1fb Vangelis Koukis
class PooledConnection(object):
53 e83ed1fb Vangelis Koukis
    """Thin wrapper around a psycopg2 connection for a pooled connection.
54 e83ed1fb Vangelis Koukis

55 e83ed1fb Vangelis Koukis
    It takes care to put itself back into the pool upon connection closure.
56 e83ed1fb Vangelis Koukis

57 e83ed1fb Vangelis Koukis
    """
58 e83ed1fb Vangelis Koukis
    def __init__(self, pool, conn):
59 68453d22 Christos Stavrakakis
        log.debug("INIT-POOLED: pool = %s, conn = %s", pool, conn)
60 e83ed1fb Vangelis Koukis
        self._pool = pool
61 e83ed1fb Vangelis Koukis
        self._conn = conn
62 e83ed1fb Vangelis Koukis
63 e83ed1fb Vangelis Koukis
    def close(self):
64 68453d22 Christos Stavrakakis
        log.debug("CLOSE-POOLED: self._pool = %s", self._pool)
65 e83ed1fb Vangelis Koukis
        if not self._pool:
66 e83ed1fb Vangelis Koukis
            return
67 e83ed1fb Vangelis Koukis
68 e83ed1fb Vangelis Koukis
        pool = self._pool
69 68453d22 Christos Stavrakakis
        log.debug("PUT-POOLED-BEFORE: about to return %s to the pool", self)
70 e83ed1fb Vangelis Koukis
        pool.pool_put(self)
71 68453d22 Christos Stavrakakis
        log.debug("PUT-POOLED-AFTER: returned %s to the pool", self)
72 e83ed1fb Vangelis Koukis
73 e83ed1fb Vangelis Koukis
    def __getattr__(self, attr):
74 e83ed1fb Vangelis Koukis
        """Proxy every other call to the real connection"""
75 e83ed1fb Vangelis Koukis
        return getattr(self._conn, attr)
76 e83ed1fb Vangelis Koukis
77 e83ed1fb Vangelis Koukis
    def __setattr__(self, attr, val):
78 e83ed1fb Vangelis Koukis
        if attr not in ("_pool", "_conn"):
79 e83ed1fb Vangelis Koukis
            setattr(self._conn, attr, val)
80 e83ed1fb Vangelis Koukis
        object.__setattr__(self, attr, val)
81 e83ed1fb Vangelis Koukis
82 e83ed1fb Vangelis Koukis
83 e83ed1fb Vangelis Koukis
class Psycopg2ConnectionPool(ObjectPool):
84 e83ed1fb Vangelis Koukis
    """A synnefo.lib.pool.ObjectPool of psycopg2 connections.
85 e83ed1fb Vangelis Koukis

86 e83ed1fb Vangelis Koukis
    Every connection knows how to return itself to the pool
87 e83ed1fb Vangelis Koukis
    when it gets close()d.
88 e83ed1fb Vangelis Koukis

89 e83ed1fb Vangelis Koukis
    """
90 e83ed1fb Vangelis Koukis
91 e83ed1fb Vangelis Koukis
    def __init__(self, **kw):
92 e83ed1fb Vangelis Koukis
        ObjectPool.__init__(self, size=kw["synnefo_poolsize"])
93 e83ed1fb Vangelis Koukis
        kw.pop("synnefo_poolsize")
94 e83ed1fb Vangelis Koukis
        self._connection_args = kw
95 e83ed1fb Vangelis Koukis
96 e83ed1fb Vangelis Koukis
    def _pool_create(self):
97 68453d22 Christos Stavrakakis
        log.info("CREATE: about to get a new connection from psycopg2")
98 e83ed1fb Vangelis Koukis
        conn = psycopg2._original_connect(**self._connection_args)
99 68453d22 Christos Stavrakakis
        log.info("CREATED: got connection %s from psycopg2", conn)
100 e83ed1fb Vangelis Koukis
        return PooledConnection(self, conn)
101 e83ed1fb Vangelis Koukis
102 d8fe0948 Georgios D. Tsoukalas
    def _pool_verify_execute(pooledconn):
103 d8fe0948 Georgios D. Tsoukalas
        try:
104 d8fe0948 Georgios D. Tsoukalas
            cursor = pooledconn.cursor()
105 d8fe0948 Georgios D. Tsoukalas
            cursor.execute("SELECT 1")
106 d8fe0948 Georgios D. Tsoukalas
        except psycopg2.Error:
107 d8fe0948 Georgios D. Tsoukalas
            # The connection has died.
108 d8fe0948 Georgios D. Tsoukalas
            pooledconn.close()
109 d8fe0948 Georgios D. Tsoukalas
            return False
110 d8fe0948 Georgios D. Tsoukalas
        return True
111 d8fe0948 Georgios D. Tsoukalas
112 d8fe0948 Georgios D. Tsoukalas
    def _pool_verify(self, conn):
113 d8fe0948 Georgios D. Tsoukalas
        if select((conn.fileno(),), (), (), 0)[0]:
114 d8fe0948 Georgios D. Tsoukalas
            return False
115 d8fe0948 Georgios D. Tsoukalas
        return True
116 d8fe0948 Georgios D. Tsoukalas
117 e83ed1fb Vangelis Koukis
    def _pool_cleanup(self, pooledconn):
118 68453d22 Christos Stavrakakis
        log.debug("CLEANING, conn = %d", id(pooledconn))
119 e83ed1fb Vangelis Koukis
        try:
120 e83ed1fb Vangelis Koukis
            # Reset this connection before putting it back
121 e83ed1fb Vangelis Koukis
            # into the pool
122 5f6ad491 Christos Stavrakakis
            pooledconn.rollback()
123 e83ed1fb Vangelis Koukis
        except psycopg2.Error:
124 e83ed1fb Vangelis Koukis
            # Since we're not going to be putting the psycopg2 connection
125 e83ed1fb Vangelis Koukis
            # back into the pool, close it uncoditionally.
126 68453d22 Christos Stavrakakis
            log.error("Detected dead connection, conn = %d, %s",
127 68453d22 Christos Stavrakakis
                      id(pooledconn), pooledconn)
128 e83ed1fb Vangelis Koukis
            try:
129 e83ed1fb Vangelis Koukis
                pooledconn._conn.close()
130 e83ed1fb Vangelis Koukis
            except:
131 e83ed1fb Vangelis Koukis
                pass
132 e83ed1fb Vangelis Koukis
            return True
133 e83ed1fb Vangelis Koukis
        return False
134 e83ed1fb Vangelis Koukis
135 e83ed1fb Vangelis Koukis
136 e83ed1fb Vangelis Koukis
def _init_pool(kw):
137 e83ed1fb Vangelis Koukis
    global _pool
138 e83ed1fb Vangelis Koukis
    global _pool_kwargs
139 e83ed1fb Vangelis Koukis
140 e83ed1fb Vangelis Koukis
    _pool_kwargs = kw
141 e83ed1fb Vangelis Koukis
    _pool = Psycopg2ConnectionPool(**kw)
142 e83ed1fb Vangelis Koukis
143 e83ed1fb Vangelis Koukis
144 e83ed1fb Vangelis Koukis
def _get_pool(kw):
145 e83ed1fb Vangelis Koukis
    if not _pool:
146 68453d22 Christos Stavrakakis
        log.debug("INIT-POOL: Initializing DB connection pool")
147 e83ed1fb Vangelis Koukis
        _init_pool(kw)
148 e83ed1fb Vangelis Koukis
149 e83ed1fb Vangelis Koukis
    if _pool_kwargs != kw:
150 e83ed1fb Vangelis Koukis
        raise NotImplementedError(("Requested pooled psycopg2 connection with "
151 e83ed1fb Vangelis Koukis
                                   "args %s != %s." % (kw, _pool_kwargs)))
152 e83ed1fb Vangelis Koukis
    return _pool
153 e83ed1fb Vangelis Koukis
154 e83ed1fb Vangelis Koukis
155 e83ed1fb Vangelis Koukis
def _pooled_connect(**kw):
156 e83ed1fb Vangelis Koukis
    poolsize = kw.get("synnefo_poolsize", 0)
157 e83ed1fb Vangelis Koukis
    if not poolsize:
158 e83ed1fb Vangelis Koukis
        kw.pop("synnefo_poolsize", None)
159 e83ed1fb Vangelis Koukis
        return psycopg2._original_connect(**kw)
160 e83ed1fb Vangelis Koukis
161 e83ed1fb Vangelis Koukis
    pool = _get_pool(kw)
162 68453d22 Christos Stavrakakis
    log.debug("GET-POOL: Pool: %r", pool)
163 d8fe0948 Georgios D. Tsoukalas
    r = pool.pool_get()
164 68453d22 Christos Stavrakakis
    log.debug("GOT-POOL: Got connection %d from pool %r", id(r), pool)
165 e83ed1fb Vangelis Koukis
    return r
166 e83ed1fb Vangelis Koukis
167 e83ed1fb Vangelis Koukis
168 e83ed1fb Vangelis Koukis
def monkey_patch_psycopg2():
169 e83ed1fb Vangelis Koukis
    """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
170 e83ed1fb Vangelis Koukis

171 e83ed1fb Vangelis Koukis
    To enable pooling, you need to pass a synnefo_poolsize argument
172 e83ed1fb Vangelis Koukis
    inside psycopg2's connection options.
173 e83ed1fb Vangelis Koukis

174 e83ed1fb Vangelis Koukis
    """
175 e83ed1fb Vangelis Koukis
176 e83ed1fb Vangelis Koukis
    if hasattr(psycopg2, '_original_connect'):
177 e83ed1fb Vangelis Koukis
        return
178 e83ed1fb Vangelis Koukis
179 e83ed1fb Vangelis Koukis
    psycopg2._original_connect = psycopg2.connect
180 e83ed1fb Vangelis Koukis
    psycopg2.connect = _pooled_connect