Revision ab133e68

b/snf-common/synnefo/lib/db/pooled_psycopg2/__init__.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

  
35
import psycopg2
36
from synnefo.lib.pool import ObjectPool
37

  
38
import logging
39
logger = logging.getLogger(__name__)
40

  
41

  
42
# Quick-n-dirty logging setup to stderr
43

  
44
#LOG_FORMAT = "%(asctime)-15s %(levelname)-6s %(message)s"
45
#handler = logging.StreamHandler()
46
#handler = logging.FileHandler("/tmp/aa1")
47
#handler.setFormatter(logging.Formatter(LOG_FORMAT))
48
#logger.addHandler(handler)
49
#logger.setLevel(logging.DEBUG)
50
#logger.fatal("kalhmera1")
51
#import sys
52
#print >>sys.stderr, "logger is %d" % id(logger)
53

  
54
_pool_kwargs = None
55
_pool = None
56

  
57

  
58
# How many times to retry on getting a live connection
59
# from the pool, before giving up.
60
RETRY_LIMIT = 1000
61

  
62

  
63
class PooledConnection(object):
64
    """Thin wrapper around a psycopg2 connection for a pooled connection.
65

  
66
    It takes care to put itself back into the pool upon connection closure.
67

  
68
    """
69
    def __init__(self, pool, conn):
70
        #print >>sys.stderr,"LOGGER is %d" % id(logger)
71
        logger.fatal("INIT POOLED CONN: pool = %s, conn = %s", pool, conn)
72
        self._pool = pool
73
        self._conn = conn
74

  
75
    def close(self):
76
        logger.debug("CLOSE POOLED CONN: self._pool = %s", self._pool)
77
        if not self._pool:
78
            return
79

  
80
        pool = self._pool
81
        logger.debug("PUT POOLED CONN: About to return %s to the pool", self)
82
        pool.pool_put(self)
83
        logger.debug("FINISHED PUT POOLED CONN: Returned %s to the pool", self)
84

  
85
    def __getattr__(self, attr):
86
        """Proxy every other call to the real connection"""
87
        return getattr(self._conn, attr)
88

  
89
    def __setattr__(self, attr, val):
90
        if attr not in ("_pool", "_conn"):
91
            setattr(self._conn, attr, val)
92
        object.__setattr__(self, attr, val)
93

  
94
    #def __del__(self):
95
    #    pass
96
    #    print >>sys.stderr, "DELETED"
97

  
98

  
99
class Psycopg2ConnectionPool(ObjectPool):
100
    """A synnefo.lib.pool.ObjectPool of psycopg2 connections.
101

  
102
    Every connection knows how to return itself to the pool
103
    when it gets close()d.
104

  
105
    """
106

  
107
    def __init__(self, **kw):
108
        ObjectPool.__init__(self, size=kw["synnefo_poolsize"])
109
        kw.pop("synnefo_poolsize")
110
        self._connection_args = kw
111

  
112
    def _pool_create(self):
113
        logger.info("CREATE: About to get a new connection from psycopg2")
114
        conn = psycopg2._original_connect(**self._connection_args)
115
        logger.info("CREATED: Got connection %s from psycopg2", conn)
116
        return PooledConnection(self, conn)
117

  
118
    def _pool_cleanup(self, pooledconn):
119
        logger.debug("CLEANING, conn = %d", id(pooledconn))
120
        try:
121
            # Reset this connection before putting it back
122
            # into the pool
123
            cursor = pooledconn.cursor()
124
            cursor.execute("ABORT; RESET ALL")
125
        except psycopg2.Error:
126
            # Since we're not going to be putting the psycopg2 connection
127
            # back into the pool, close it uncoditionally.
128
            logger.error("DEAD connection, conn = %d, %s",
129
                         id(pooledconn), pooledconn)
130
            try:
131
                pooledconn._conn.close()
132
            except:
133
                pass
134
            return True
135
        return False
136

  
137

  
138
def _init_pool(kw):
139
    global _pool
140
    global _pool_kwargs
141

  
142
    _pool_kwargs = kw
143
    _pool = Psycopg2ConnectionPool(**kw)
144

  
145

  
146
def _get_pool(kw):
147
    if not _pool:
148
        logger.debug("POOLINIT: Initializing DB connection pool")
149
        _init_pool(kw)
150

  
151
    if _pool_kwargs != kw:
152
        raise NotImplementedError(("Requested pooled psycopg2 connection with "
153
                                   "args %s != %s." % (kw, _pool_kwargs)))
154
    return _pool
155

  
156

  
157
def _verify_connection(pooledconn):
158
    try:
159
        cursor = pooledconn.cursor()
160
        cursor.execute("SELECT 1")
161
    except psycopg2.Error:
162
        # The connection has died.
163
        pooledconn.close()
164
        return False
165
    return True
166

  
167

  
168
def _pooled_connect(**kw):
169
    poolsize = kw.get("synnefo_poolsize", 0)
170
    if not poolsize:
171
        kw.pop("synnefo_poolsize", None)
172
        return psycopg2._original_connect(**kw)
173

  
174
    pool = _get_pool(kw)
175
    logger.debug("GET: Pool: set: %d, semaphore: %d",
176
                 len(pool._set), pool._semaphore._Semaphore__value)
177
    r = None
178
    n = 0
179
    while not r:
180
        r = pool.pool_get()
181
        if not _verify_connection(r):
182
            logger.error("DEADCONNECTION: Got dead connection %d from pool",
183
                         id(r))
184
            r = None
185
            n += 1
186
            if n > RETRY_LIMIT:
187
                raise RuntimeError(("Could not get live connection from pool"
188
                                    "after %d retries." % RETRY_LIMIT))
189
    logger.debug("GOT: Got connection %d from pool", id(r))
190
    return r
191

  
192

  
193
def monkey_patch_psycopg2():
194
    """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
195

  
196
    To enable pooling, you need to pass a synnefo_poolsize argument
197
    inside psycopg2's connection options.
198

  
199
    """
200

  
201
    if hasattr(psycopg2, '_original_connect'):
202
        return
203

  
204
    psycopg2._original_connect = psycopg2.connect
205
    psycopg2.connect = _pooled_connect
b/snf-common/synnefo/lib/db/psyco_gevent.py
1
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
2
# and licensed under the MIT license:
3
#
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
5
# of this software and associated documentation files (the "Software"), to deal
6
# in the Software without restriction, including without limitation the rights
7
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
# copies of the Software, and to permit persons to whom the Software is
9
# furnished to do so, subject to the following conditions:
10
#
11
# The above copyright notice and this permission notice shall be included in
12
# all copies or substantial portions of the Software.
13
#
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
# THE SOFTWARE.
21
"""A wait callback to allow psycopg2 cooperation with gevent.
22

  
23
Use `make_psycopg_green()` to enable gevent support in Psycopg.
24

  
25
"""
26

  
27
import psycopg2
28
from psycopg2 import extensions
29

  
30
from gevent.socket import wait_read, wait_write
31

  
32

  
33
def make_psycopg_green():
34
    """Configure Psycopg to be used with gevent in non-blocking way."""
35
    if not hasattr(extensions, 'set_wait_callback'):
36
        raise ImportError(
37
            "support for coroutines not available in this Psycopg version (%s)"
38
            % psycopg2.__version__)
39

  
40
    extensions.set_wait_callback(gevent_wait_callback)
41

  
42

  
43
def gevent_wait_callback(conn, timeout=None):
44
    """A wait callback useful to allow gevent to work with Psycopg."""
45
    while 1:
46
        state = conn.poll()
47
        if state == extensions.POLL_OK:
48
            break
49
        elif state == extensions.POLL_READ:
50
            wait_read(conn.fileno(), timeout=timeout)
51
        elif state == extensions.POLL_WRITE:
52
            wait_write(conn.fileno(), timeout=timeout)
53
        else:
54
            raise psycopg2.OperationalError("Bad result from poll: %r" % state)
/dev/null
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
#
34

  
35
import psycopg2
36
from synnefo.lib.pool import ObjectPool
37

  
38
import logging
39
logger = logging.getLogger(__name__)
40

  
41

  
42
# Quick-n-dirty logging setup to stderr
43

  
44
#LOG_FORMAT = "%(asctime)-15s %(levelname)-6s %(message)s"
45
#handler = logging.StreamHandler()
46
#handler = logging.FileHandler("/tmp/aa1")
47
#handler.setFormatter(logging.Formatter(LOG_FORMAT))
48
#logger.addHandler(handler)
49
#logger.setLevel(logging.DEBUG)
50
#logger.fatal("kalhmera1")
51
#import sys
52
#print >>sys.stderr, "logger is %d" % id(logger)
53

  
54
_pool_kwargs = None
55
_pool = None
56

  
57

  
58
# How many times to retry on getting a live connection
59
# from the pool, before giving up.
60
RETRY_LIMIT = 1000
61

  
62

  
63
class PooledConnection(object):
64
    """Thin wrapper around a psycopg2 connection for a pooled connection.
65

  
66
    It takes care to put itself back into the pool upon connection closure.
67

  
68
    """
69
    def __init__(self, pool, conn):
70
        #print >>sys.stderr,"LOGGER is %d" % id(logger)
71
        logger.fatal("INIT POOLED CONN: pool = %s, conn = %s", pool, conn)
72
        self._pool = pool
73
        self._conn = conn
74

  
75
    def close(self):
76
        logger.debug("CLOSE POOLED CONN: self._pool = %s", self._pool)
77
        if not self._pool:
78
            return
79

  
80
        pool = self._pool
81
        logger.debug("PUT POOLED CONN: About to return %s to the pool", self)
82
        pool.pool_put(self)
83
        logger.debug("FINISHED PUT POOLED CONN: Returned %s to the pool", self)
84

  
85
    def __getattr__(self, attr):
86
        """Proxy every other call to the real connection"""
87
        return getattr(self._conn, attr)
88

  
89
    def __setattr__(self, attr, val):
90
        if attr not in ("_pool", "_conn"):
91
            setattr(self._conn, attr, val)
92
        object.__setattr__(self, attr, val)
93

  
94
    #def __del__(self):
95
    #    pass
96
    #    print >>sys.stderr, "DELETED"
97

  
98

  
99
class Psycopg2ConnectionPool(ObjectPool):
100
    """A synnefo.lib.pool.ObjectPool of psycopg2 connections.
101

  
102
    Every connection knows how to return itself to the pool
103
    when it gets close()d.
104

  
105
    """
106

  
107
    def __init__(self, **kw):
108
        ObjectPool.__init__(self, size=kw["synnefo_poolsize"])
109
        kw.pop("synnefo_poolsize")
110
        self._connection_args = kw
111

  
112
    def _pool_create(self):
113
        logger.info("CREATE: About to get a new connection from psycopg2")
114
        conn = psycopg2._original_connect(**self._connection_args)
115
        logger.info("CREATED: Got connection %s from psycopg2", conn)
116
        return PooledConnection(self, conn)
117

  
118
    def _pool_cleanup(self, pooledconn):
119
        logger.debug("CLEANING, conn = %d", id(pooledconn))
120
        try:
121
            # Reset this connection before putting it back
122
            # into the pool
123
            cursor = pooledconn.cursor()
124
            cursor.execute("ABORT; RESET ALL")
125
        except psycopg2.Error:
126
            # Since we're not going to be putting the psycopg2 connection
127
            # back into the pool, close it uncoditionally.
128
            logger.error("DEAD connection, conn = %d, %s",
129
                         id(pooledconn), pooledconn)
130
            try:
131
                pooledconn._conn.close()
132
            except:
133
                pass
134
            return True
135
        return False
136

  
137

  
138
def _init_pool(kw):
139
    global _pool
140
    global _pool_kwargs
141

  
142
    _pool_kwargs = kw
143
    _pool = Psycopg2ConnectionPool(**kw)
144

  
145

  
146
def _get_pool(kw):
147
    if not _pool:
148
        logger.debug("POOLINIT: Initializing DB connection pool")
149
        _init_pool(kw)
150

  
151
    if _pool_kwargs != kw:
152
        raise NotImplementedError(("Requested pooled psycopg2 connection with "
153
                                   "args %s != %s." % (kw, _pool_kwargs)))
154
    return _pool
155

  
156

  
157
def _verify_connection(pooledconn):
158
    try:
159
        cursor = pooledconn.cursor()
160
        cursor.execute("SELECT 1")
161
    except psycopg2.Error:
162
        # The connection has died.
163
        pooledconn.close()
164
        return False
165
    return True
166

  
167

  
168
def _pooled_connect(**kw):
169
    poolsize = kw.get("synnefo_poolsize", 0)
170
    if not poolsize:
171
        kw.pop("synnefo_poolsize", None)
172
        return psycopg2._original_connect(**kw)
173

  
174
    pool = _get_pool(kw)
175
    logger.debug("GET: Pool: set: %d, semaphore: %d",
176
                 len(pool._set), pool._semaphore._Semaphore__value)
177
    r = None
178
    n = 0
179
    while not r:
180
        r = pool.pool_get()
181
        if not _verify_connection(r):
182
            logger.error("DEADCONNECTION: Got dead connection %d from pool",
183
                         id(r))
184
            r = None
185
            n += 1
186
            if n > RETRY_LIMIT:
187
                raise RuntimeError(("Could not get live connection from pool"
188
                                    "after %d retries." % RETRY_LIMIT))
189
    logger.debug("GOT: Got connection %d from pool", id(r))
190
    return r
191

  
192

  
193
def monkey_patch_psycopg2():
194
    """Monkey-patch psycopg2's connect(), to retrieve connections from a pool.
195

  
196
    To enable pooling, you need to pass a synnefo_poolsize argument
197
    inside psycopg2's connection options.
198

  
199
    """
200

  
201
    if hasattr(psycopg2, '_original_connect'):
202
        return
203

  
204
    psycopg2._original_connect = psycopg2.connect
205
    psycopg2.connect = _pooled_connect
/dev/null
1
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
2
# and licensed under the MIT license:
3
#
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
5
# of this software and associated documentation files (the "Software"), to deal
6
# in the Software without restriction, including without limitation the rights
7
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
# copies of the Software, and to permit persons to whom the Software is
9
# furnished to do so, subject to the following conditions:
10
#
11
# The above copyright notice and this permission notice shall be included in
12
# all copies or substantial portions of the Software.
13
#
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
# THE SOFTWARE.
21
"""A wait callback to allow psycopg2 cooperation with gevent.
22

  
23
Use `make_psycopg_green()` to enable gevent support in Psycopg.
24

  
25
"""
26

  
27
import psycopg2
28
from psycopg2 import extensions
29

  
30
from gevent.socket import wait_read, wait_write
31

  
32

  
33
def make_psycopg_green():
34
    """Configure Psycopg to be used with gevent in non-blocking way."""
35
    if not hasattr(extensions, 'set_wait_callback'):
36
        raise ImportError(
37
            "support for coroutines not available in this Psycopg version (%s)"
38
            % psycopg2.__version__)
39

  
40
    extensions.set_wait_callback(gevent_wait_callback)
41

  
42

  
43
def gevent_wait_callback(conn, timeout=None):
44
    """A wait callback useful to allow gevent to work with Psycopg."""
45
    while 1:
46
        state = conn.poll()
47
        if state == extensions.POLL_OK:
48
            break
49
        elif state == extensions.POLL_READ:
50
            wait_read(conn.fileno(), timeout=timeout)
51
        elif state == extensions.POLL_WRITE:
52
            wait_write(conn.fileno(), timeout=timeout)
53
        else:
54
            raise psycopg2.OperationalError("Bad result from poll: %r" % state)

Also available in: Unified diff