Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / db / pooled_psycopg2 / __init__.py @ ab133e68

History | View | Annotate | Download (6.5 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

    
35
import psycopg2
36
from synnefo.lib.pool import ObjectPool
37

    
38
import logging
39
logger = logging.getLogger(__name__)
40

    
41

    
42
# Quick-n-dirty logging setup to stderr
43

    
44
#LOG_FORMAT = "%(asctime)-15s %(levelname)-6s %(message)s"
45
#handler = logging.StreamHandler()
46
#handler = logging.FileHandler("/tmp/aa1")
47
#handler.setFormatter(logging.Formatter(LOG_FORMAT))
48
#logger.addHandler(handler)
49
#logger.setLevel(logging.DEBUG)
50
#logger.fatal("kalhmera1")
51
#import sys
52
#print >>sys.stderr, "logger is %d" % id(logger)
53

    
54
_pool_kwargs = None
55
_pool = None
56

    
57

    
58
# How many times to retry on getting a live connection
59
# from the pool, before giving up.
60
RETRY_LIMIT = 1000
61

    
62

    
63
class PooledConnection(object):
64
    """Thin wrapper around a psycopg2 connection for a pooled connection.
65

66
    It takes care to put itself back into the pool upon connection closure.
67

68
    """
69
    def __init__(self, pool, conn):
70
        #print >>sys.stderr,"LOGGER is %d" % id(logger)
71
        logger.fatal("INIT POOLED CONN: pool = %s, conn = %s", pool, conn)
72
        self._pool = pool
73
        self._conn = conn
74

    
75
    def close(self):
76
        logger.debug("CLOSE POOLED CONN: self._pool = %s", self._pool)
77
        if not self._pool:
78
            return
79

    
80
        pool = self._pool
81
        logger.debug("PUT POOLED CONN: About to return %s to the pool", self)
82
        pool.pool_put(self)
83
        logger.debug("FINISHED PUT POOLED CONN: Returned %s to the pool", self)
84

    
85
    def __getattr__(self, attr):
86
        """Proxy every other call to the real connection"""
87
        return getattr(self._conn, attr)
88

    
89
    def __setattr__(self, attr, val):
90
        if attr not in ("_pool", "_conn"):
91
            setattr(self._conn, attr, val)
92
        object.__setattr__(self, attr, val)
93

    
94
    #def __del__(self):
95
    #    pass
96
    #    print >>sys.stderr, "DELETED"
97

    
98

    
99
class Psycopg2ConnectionPool(ObjectPool):
100
    """A synnefo.lib.pool.ObjectPool of psycopg2 connections.
101

102
    Every connection knows how to return itself to the pool
103
    when it gets close()d.
104

105
    """
106

    
107
    def __init__(self, **kw):
108
        ObjectPool.__init__(self, size=kw["synnefo_poolsize"])
109
        kw.pop("synnefo_poolsize")
110
        self._connection_args = kw
111

    
112
    def _pool_create(self):
113
        logger.info("CREATE: About to get a new connection from psycopg2")
114
        conn = psycopg2._original_connect(**self._connection_args)
115
        logger.info("CREATED: Got connection %s from psycopg2", conn)
116
        return PooledConnection(self, conn)
117

    
118
    def _pool_cleanup(self, pooledconn):
119
        logger.debug("CLEANING, conn = %d", id(pooledconn))
120
        try:
121
            # Reset this connection before putting it back
122
            # into the pool
123
            cursor = pooledconn.cursor()
124
            cursor.execute("ABORT; RESET ALL")
125
        except psycopg2.Error:
126
            # Since we're not going to be putting the psycopg2 connection
127
            # back into the pool, close it uncoditionally.
128
            logger.error("DEAD connection, conn = %d, %s",
129
                         id(pooledconn), pooledconn)
130
            try:
131
                pooledconn._conn.close()
132
            except:
133
                pass
134
            return True
135
        return False
136

    
137

    
138
def _init_pool(kw):
139
    global _pool
140
    global _pool_kwargs
141

    
142
    _pool_kwargs = kw
143
    _pool = Psycopg2ConnectionPool(**kw)
144

    
145

    
146
def _get_pool(kw):
147
    if not _pool:
148
        logger.debug("POOLINIT: Initializing DB connection pool")
149
        _init_pool(kw)
150

    
151
    if _pool_kwargs != kw:
152
        raise NotImplementedError(("Requested pooled psycopg2 connection with "
153
                                   "args %s != %s." % (kw, _pool_kwargs)))
154
    return _pool
155

    
156

    
157
def _verify_connection(pooledconn):
158
    try:
159
        cursor = pooledconn.cursor()
160
        cursor.execute("SELECT 1")
161
    except psycopg2.Error:
162
        # The connection has died.
163
        pooledconn.close()
164
        return False
165
    return True
166

    
167

    
168
def _pooled_connect(**kw):
169
    poolsize = kw.get("synnefo_poolsize", 0)
170
    if not poolsize:
171
        kw.pop("synnefo_poolsize", None)
172
        return psycopg2._original_connect(**kw)
173

    
174
    pool = _get_pool(kw)
175
    logger.debug("GET: Pool: set: %d, semaphore: %d",
176
                 len(pool._set), pool._semaphore._Semaphore__value)
177
    r = None
178
    n = 0
179
    while not r:
180
        r = pool.pool_get()
181
        if not _verify_connection(r):
182
            logger.error("DEADCONNECTION: Got dead connection %d from pool",
183
                         id(r))
184
            r = None
185
            n += 1
186
            if n > RETRY_LIMIT:
187
                raise RuntimeError(("Could not get live connection from pool"
188
                                    "after %d retries." % RETRY_LIMIT))
189
    logger.debug("GOT: Got connection %d from pool", id(r))
190
    return r
191

    
192

    
193
def monkey_patch_psycopg2():
194
    """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
195

196
    To enable pooling, you need to pass a synnefo_poolsize argument
197
    inside psycopg2's connection options.
198

199
    """
200

    
201
    if hasattr(psycopg2, '_original_connect'):
202
        return
203

    
204
    psycopg2._original_connect = psycopg2.connect
205
    psycopg2.connect = _pooled_connect