Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / db / pooled_psycopg2 / __init__.py @ 1a736ca8

History | View | Annotate | Download (5.6 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
import psycopg2
36
from objpool import ObjectPool
37

    
38
from select import select
39
import logging
40
log = logging.getLogger(__name__)
41

    
42

    
43
_pool_kwargs = None
44
_pool = None
45

    
46

    
47
# How many times to retry on getting a live connection
48
# from the pool, before giving up.
49
RETRY_LIMIT = 1000
50

    
51

    
52
class PooledConnection(object):
53
    """Thin wrapper around a psycopg2 connection for a pooled connection.
54

55
    It takes care to put itself back into the pool upon connection closure.
56

57
    """
58
    def __init__(self, pool, conn):
59
        log.debug("INIT-POOLED: pool = %s, conn = %s", pool, conn)
60
        self._pool = pool
61
        self._conn = conn
62

    
63
    def close(self):
64
        log.debug("CLOSE-POOLED: self._pool = %s", self._pool)
65
        if not self._pool:
66
            return
67

    
68
        pool = self._pool
69
        log.debug("PUT-POOLED-BEFORE: about to return %s to the pool", self)
70
        pool.pool_put(self)
71
        log.debug("PUT-POOLED-AFTER: returned %s to the pool", self)
72

    
73
    def __getattr__(self, attr):
74
        """Proxy every other call to the real connection"""
75
        return getattr(self._conn, attr)
76

    
77
    def __setattr__(self, attr, val):
78
        if attr not in ("_pool", "_conn"):
79
            setattr(self._conn, attr, val)
80
        object.__setattr__(self, attr, val)
81

    
82

    
83
class Psycopg2ConnectionPool(ObjectPool):
84
    """A objpool.ObjectPool of psycopg2 connections.
85

86
    Every connection knows how to return itself to the pool
87
    when it gets close()d.
88

89
    """
90

    
91
    def __init__(self, **kw):
92
        ObjectPool.__init__(self, size=kw["synnefo_poolsize"])
93
        kw.pop("synnefo_poolsize")
94
        self._connection_args = kw
95

    
96
    def _pool_create(self):
97
        log.info("CREATE: about to get a new connection from psycopg2")
98
        conn = psycopg2._original_connect(**self._connection_args)
99
        log.info("CREATED: got connection %s from psycopg2", conn)
100
        return PooledConnection(self, conn)
101

    
102
    def _pool_verify_execute(pooledconn):
103
        try:
104
            cursor = pooledconn.cursor()
105
            cursor.execute("SELECT 1")
106
        except psycopg2.Error:
107
            # The connection has died.
108
            pooledconn.close()
109
            return False
110
        return True
111

    
112
    def _pool_verify(self, conn):
113
        if select((conn.fileno(),), (), (), 0)[0]:
114
            return False
115
        return True
116

    
117
    def _pool_cleanup(self, pooledconn):
118
        log.debug("CLEANING, conn = %d", id(pooledconn))
119
        try:
120
            # Reset this connection before putting it back
121
            # into the pool
122
            pooledconn.rollback()
123
        except psycopg2.Error:
124
            # Since we're not going to be putting the psycopg2 connection
125
            # back into the pool, close it uncoditionally.
126
            log.error("Detected dead connection, conn = %d, %s",
127
                      id(pooledconn), pooledconn)
128
            try:
129
                pooledconn._conn.close()
130
            except:
131
                pass
132
            return True
133
        return False
134

    
135

    
136
def _init_pool(kw):
137
    global _pool
138
    global _pool_kwargs
139

    
140
    _pool_kwargs = kw
141
    _pool = Psycopg2ConnectionPool(**kw)
142

    
143

    
144
def _get_pool(kw):
145
    if not _pool:
146
        log.debug("INIT-POOL: Initializing DB connection pool")
147
        _init_pool(kw)
148

    
149
    if _pool_kwargs != kw:
150
        raise NotImplementedError(("Requested pooled psycopg2 connection with "
151
                                   "args %s != %s." % (kw, _pool_kwargs)))
152
    return _pool
153

    
154

    
155
def _pooled_connect(**kw):
156
    poolsize = kw.get("synnefo_poolsize", 0)
157
    if not poolsize:
158
        kw.pop("synnefo_poolsize", None)
159
        return psycopg2._original_connect(**kw)
160

    
161
    pool = _get_pool(kw)
162
    log.debug("GET-POOL: Pool: %r", pool)
163
    r = pool.pool_get()
164
    log.debug("GOT-POOL: Got connection %d from pool %r", id(r), pool)
165
    return r
166

    
167

    
168
def monkey_patch_psycopg2():
169
    """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
170

171
    To enable pooling, you need to pass a synnefo_poolsize argument
172
    inside psycopg2's connection options.
173

174
    """
175

    
176
    if hasattr(psycopg2, '_original_connect'):
177
        return
178

    
179
    psycopg2._original_connect = psycopg2.connect
180
    psycopg2.connect = _pooled_connect