Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / pool / __init__.py @ 1c65202f

History | View | Annotate | Download (8.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
"""Classes to support pools of arbitrary objects.
36

37
The :class:`ObjectPool` class in this module abstracts a pool
38
of arbitrary objects. Subclasses need to define the details regarding
39
creation, destruction, allocation and release of their specific objects.
40

41
"""
42

    
43
# This should work under gevent, because gevent monkey patches 'threading'
44
# if not, we can detect if running under gevent, e.g. using
45
# if 'gevent' in sys.modules:
46
#     from gevent.coros import Semaphore
47
# else:
48
#     from threading import Semaphore
49
from threading import Semaphore, Lock
50

    
51

    
52
__all__ = ['ObjectPool', 'ObjectPoolError',
53
           'PoolLimitError', 'PoolVerificationError']
54

    
55
import logging
56
log = logging.getLogger(__name__)
57

    
58

    
59
class ObjectPoolError(Exception):
60
    pass
61

    
62

    
63
class PoolLimitError(ObjectPoolError):
64
    pass
65

    
66

    
67
class PoolVerificationError(ObjectPoolError):
68
    pass
69

    
70

    
71
class ObjectPool(object):
72
    """Generic Object Pool.
73

74
    The pool consists of an object set and an allocation semaphore.
75

76
    pool_get() gets an allocation from the semaphore
77
               and an object from the pool set.
78

79
    pool_put() releases an allocation to the semaphore
80
               and puts an object back to the pool set.
81

82
    Subclasses must implement these thread-safe hooks:
83
    _pool_create()
84
            used as a subclass hook to auto-create new objects in pool_get().
85
    _pool_verify()
86
            verifies objects before they are returned by pool_get()
87
    _pool_cleanup()
88
            cleans up and verifies objects before their return by pool_put().
89

90
    While allocations are strictly accounted for and limited by
91
    the semaphore, objects are expendable:
92

93
    The hook provider and the caller are solely responsible for object
94
    handling.
95

96
    pool_get() may create an object if there is none in the pool set.
97
    pool_get() may return no object, leaving object creation to the caller.
98
    pool_put() may return no object to the pool set.
99
    Objects to pool_put() to the pool need not be those from pool_get().
100
    Objects to pool_get() need not be those from pool_put().
101

102

103
    Callers beware:
104
    The pool limit size must be greater than the total working set of objects,
105
    otherwise it will hang. When in doubt, use an impossibly large size limit.
106
    Since the pool grows on demand, this will not waste resources.
107
    However, in that case, the pool must not be used as a flow control device
108
    (i.e. relying on pool_get() blocking to stop threads),
109
    as the impossibly large pool size limit will defer blocking until too late.
110

111
    """
112
    def __init__(self, size=None):
113
        try:
114
            self.size = int(size)
115
            assert size >= 1
116
        except:
117
            raise ValueError("Invalid size for pool (positive integer "
118
                             "required): %r" % (size,))
119

    
120
        self._semaphore = Semaphore(size)  # Pool grows up to size limit
121
        self._mutex = Lock()  # Protect shared _set oject
122
        self._set = set()
123
        log.debug("Initialized pool %r", self)
124

    
125
    def __repr__(self):
126
        return ("<pool %d: size=%d, len(_set)=%d, semaphore=%d>" %
127
                (id(self), self.size, len(self._set),
128
                 self._semaphore._Semaphore__value))
129

    
130
    def pool_get(self, blocking=True, timeout=None, create=True, verify=True):
131
        """Get an object from the pool.
132

133
        Get a pool allocation and an object from the pool set.
134
        Raise PoolLimitError if the pool allocation limit has been reached.
135
        If the pool set is empty, create a new object (create==True),
136
        or return None (create==False) and let the caller create it.
137
        All objects returned (except None) are verified.
138

139
        """
140
        # timeout argument only supported by gevent and py3k variants
141
        # of Semaphore. acquire() will raise TypeError if timeout
142
        # is specified but not supported by the underlying implementation.
143
        log.debug("GET: about to get object from pool %r", self)
144
        kw = {"blocking": blocking}
145
        if timeout is not None:
146
            kw["timeout"] = timeout
147
        sema = self._semaphore
148
        r = sema.acquire(**kw)
149
        if not r:
150
            raise PoolLimitError()
151

    
152
        try:
153
            created = 0
154
            while 1:
155
                with self._mutex:
156
                    try:
157
                        obj = self._set.pop()
158
                    except KeyError:
159
                        obj = None
160
                if obj is None and create:
161
                    obj = self._pool_create()
162
                    created = 1
163

    
164
                if not self._pool_verify(obj):
165
                    if created:
166
                        m = "Pool %r cannot verify new object %r" % (self, obj)
167
                        raise PoolVerificationError(m)
168
                    continue
169
                break
170
        except:
171
            sema.release()
172
            raise
173

    
174
        # We keep _semaphore acquired, put() will release it
175
        log.debug("GOT: object %r from pool %r", obj, self)
176
        return obj
177

    
178
    def pool_put(self, obj):
179
        """Put an object back into the pool.
180

181
        Release an allocation and return an object to the pool.
182
        If obj is None, or _pool_cleanup returns True,
183
        then the allocation is released,
184
        but no object returned to the pool set
185

186
        """
187
        log.debug("PUT-BEFORE: about to put object %r back to pool %r",
188
                  obj, self)
189
        if obj is not None and not self._pool_cleanup(obj):
190
            with self._mutex:
191
                if obj in self._set:
192
                    log.warning("Object %r already in _set of pool %r",
193
                                obj, self)
194
                self._set.add(obj)
195
        self._semaphore.release()
196
        log.debug("PUT-AFTER: finished putting object %r back to pool %r",
197
                  obj, self)
198

    
199
    def _pool_create(self):
200
        """Create a new object to be used with this pool.
201

202
        Create a new object to be used with this pool,
203
        should be overriden in subclasses.
204
        Must be thread-safe.
205
        """
206
        raise NotImplementedError
207

    
208
    def _pool_verify(self, obj):
209
        """Verify an object after getting it from the pool.
210

211
        If it returns False, the object is discarded
212
        and another one is drawn from the pool.
213
        If the pool is empty, a new object is created.
214
        If the new object fails to verify, pool_get() will fail.
215
        Must be thread-safe.
216

217
        """
218
        raise NotImplementedError
219

    
220
    def _pool_cleanup(self, obj):
221
        """Cleanup an object before being put back into the pool.
222

223
        Cleanup an object before it can be put back into the pull,
224
        ensure it is in a stable, reusable state.
225
        Must be thread-safe.
226

227
        """
228
        raise NotImplementedError