root / snf-common / synnefo / lib / pool / __init__.py @ 5f6ad491
History | View | Annotate | Download (7.9 kB)
1 | 45e32a00 | Vangelis Koukis | # Copyright 2011-2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | 45e32a00 | Vangelis Koukis | #
|
3 | 45e32a00 | Vangelis Koukis | # Redistribution and use in source and binary forms, with or
|
4 | 45e32a00 | Vangelis Koukis | # without modification, are permitted provided that the following
|
5 | 45e32a00 | Vangelis Koukis | # conditions are met:
|
6 | 45e32a00 | Vangelis Koukis | #
|
7 | 45e32a00 | Vangelis Koukis | # 1. Redistributions of source code must retain the above
|
8 | 45e32a00 | Vangelis Koukis | # copyright notice, this list of conditions and the following
|
9 | 45e32a00 | Vangelis Koukis | # disclaimer.
|
10 | 45e32a00 | Vangelis Koukis | #
|
11 | 45e32a00 | Vangelis Koukis | # 2. Redistributions in binary form must reproduce the above
|
12 | 45e32a00 | Vangelis Koukis | # copyright notice, this list of conditions and the following
|
13 | 45e32a00 | Vangelis Koukis | # disclaimer in the documentation and/or other materials
|
14 | 45e32a00 | Vangelis Koukis | # provided with the distribution.
|
15 | 45e32a00 | Vangelis Koukis | #
|
16 | 45e32a00 | Vangelis Koukis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | 45e32a00 | Vangelis Koukis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | 45e32a00 | Vangelis Koukis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | 45e32a00 | Vangelis Koukis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | 45e32a00 | Vangelis Koukis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | 45e32a00 | Vangelis Koukis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | 45e32a00 | Vangelis Koukis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | 45e32a00 | Vangelis Koukis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | 45e32a00 | Vangelis Koukis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | 45e32a00 | Vangelis Koukis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | 45e32a00 | Vangelis Koukis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | 45e32a00 | Vangelis Koukis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | 45e32a00 | Vangelis Koukis | #
|
29 | 45e32a00 | Vangelis Koukis | # The views and conclusions contained in the software and
|
30 | 45e32a00 | Vangelis Koukis | # documentation are those of the authors and should not be
|
31 | 45e32a00 | Vangelis Koukis | # interpreted as representing official policies, either expressed
|
32 | 45e32a00 | Vangelis Koukis | # or implied, of GRNET S.A.
|
33 | 45e32a00 | Vangelis Koukis | |
34 | 45e32a00 | Vangelis Koukis | |
35 | 45e32a00 | Vangelis Koukis | """Classes to support pools of arbitrary objects.
|
36 | 45e32a00 | Vangelis Koukis |
|
37 | 45e32a00 | Vangelis Koukis | The :class:`ObjectPool` class in this module abstracts a pool
|
38 | 45e32a00 | Vangelis Koukis | of arbitrary objects. Subclasses need to define the details regarding
|
39 | 45e32a00 | Vangelis Koukis | creation, destruction, allocation and release of their specific objects.
|
40 | 45e32a00 | Vangelis Koukis |
|
41 | 45e32a00 | Vangelis Koukis | """
|
42 | 45e32a00 | Vangelis Koukis | |
43 | 45e32a00 | Vangelis Koukis | # This should work under gevent, because gevent monkey patches 'threading'
|
44 | 45e32a00 | Vangelis Koukis | # if not, we can detect if running under gevent, e.g. using
|
45 | 45e32a00 | Vangelis Koukis | # if 'gevent' in sys.modules:
|
46 | 45e32a00 | Vangelis Koukis | # from gevent.coros import Semaphore
|
47 | 45e32a00 | Vangelis Koukis | # else:
|
48 | 45e32a00 | Vangelis Koukis | # from threading import Semaphore
|
49 | 45e32a00 | Vangelis Koukis | from threading import Semaphore, Lock |
50 | 45e32a00 | Vangelis Koukis | |
51 | 45e32a00 | Vangelis Koukis | |
52 | a8935947 | Georgios D. Tsoukalas | __all__ = [ 'ObjectPool', 'ObjectPoolError', |
53 | a8935947 | Georgios D. Tsoukalas | 'PoolLimitError', 'PoolVerificationError' ] |
54 | 45e32a00 | Vangelis Koukis | |
55 | 68453d22 | Christos Stavrakakis | import logging |
56 | 68453d22 | Christos Stavrakakis | log = logging.getLogger(__name__) |
57 | 68453d22 | Christos Stavrakakis | |
58 | 45e32a00 | Vangelis Koukis | |
59 | 3447b13d | Vangelis Koukis | class ObjectPoolError(Exception): |
60 | 3447b13d | Vangelis Koukis | pass
|
61 | 3447b13d | Vangelis Koukis | |
62 | a8935947 | Georgios D. Tsoukalas | class PoolLimitError(ObjectPoolError): |
63 | a8935947 | Georgios D. Tsoukalas | pass
|
64 | 3447b13d | Vangelis Koukis | |
65 | a8935947 | Georgios D. Tsoukalas | class PoolVerificationError(ObjectPoolError): |
66 | 3447b13d | Vangelis Koukis | pass
|
67 | 3447b13d | Vangelis Koukis | |
68 | 3447b13d | Vangelis Koukis | |
69 | 45e32a00 | Vangelis Koukis | class ObjectPool(object): |
70 | a8935947 | Georgios D. Tsoukalas | """Generic Object Pool.
|
71 | a8935947 | Georgios D. Tsoukalas |
|
72 | a8935947 | Georgios D. Tsoukalas | The pool consists of an object set and an allocation semaphore.
|
73 | a8935947 | Georgios D. Tsoukalas |
|
74 | a8935947 | Georgios D. Tsoukalas | pool_get() gets an allocation from the semaphore
|
75 | a8935947 | Georgios D. Tsoukalas | and an object from the pool set.
|
76 | a8935947 | Georgios D. Tsoukalas |
|
77 | a8935947 | Georgios D. Tsoukalas | pool_put() releases an allocation to the semaphore
|
78 | a8935947 | Georgios D. Tsoukalas | and puts an object back to the pool set.
|
79 | a8935947 | Georgios D. Tsoukalas |
|
80 | a8935947 | Georgios D. Tsoukalas | Subclasses must implement these thread-safe hooks:
|
81 | a8935947 | Georgios D. Tsoukalas | _pool_create()
|
82 | a8935947 | Georgios D. Tsoukalas | is used as a subclass hook to auto-create new objects in pool_get().
|
83 | a8935947 | Georgios D. Tsoukalas | _pool_verify()
|
84 | a8935947 | Georgios D. Tsoukalas | verifies objects before they are returned by pool_get()
|
85 | a8935947 | Georgios D. Tsoukalas | _pool_cleanup()
|
86 | a8935947 | Georgios D. Tsoukalas | cleans up and verifies objects before their return by pool_put().
|
87 | a8935947 | Georgios D. Tsoukalas |
|
88 | a8935947 | Georgios D. Tsoukalas | While allocations are strictly accounted for and limited by
|
89 | a8935947 | Georgios D. Tsoukalas | the semaphore, objects are expendable:
|
90 | a8935947 | Georgios D. Tsoukalas |
|
91 | a8935947 | Georgios D. Tsoukalas | The hook provider and the caller are solely responsible for object handling.
|
92 | a8935947 | Georgios D. Tsoukalas | pool_get() may create an object if there is none in the pool set.
|
93 | a8935947 | Georgios D. Tsoukalas | pool_get() may return no object, leaving object creation to the caller.
|
94 | a8935947 | Georgios D. Tsoukalas | pool_put() may return no object to the pool set.
|
95 | a8935947 | Georgios D. Tsoukalas | Objects to pool_put() to the pool need not be those from pool_get().
|
96 | a8935947 | Georgios D. Tsoukalas | Objects to pool_get() need not be those from pool_put().
|
97 | a8935947 | Georgios D. Tsoukalas |
|
98 | a8935947 | Georgios D. Tsoukalas |
|
99 | a8935947 | Georgios D. Tsoukalas | Callers beware:
|
100 | a8935947 | Georgios D. Tsoukalas | The pool limit size must be greater than the total working set of objects,
|
101 | a8935947 | Georgios D. Tsoukalas | otherwise it will hang. When in doubt, use an impossibly large size limit.
|
102 | a8935947 | Georgios D. Tsoukalas | Since the pool grows on demand, this will not waste resources.
|
103 | a8935947 | Georgios D. Tsoukalas | However, in that case, the pool must not be used as a flow control device
|
104 | a8935947 | Georgios D. Tsoukalas | (i.e. relying on pool_get() blocking to stop threads),
|
105 | a8935947 | Georgios D. Tsoukalas | as the impossibly large pool size limit will defer blocking until too late.
|
106 | a8935947 | Georgios D. Tsoukalas |
|
107 | a8935947 | Georgios D. Tsoukalas | """
|
108 | 45e32a00 | Vangelis Koukis | def __init__(self, size=None): |
109 | 45e32a00 | Vangelis Koukis | try:
|
110 | 45e32a00 | Vangelis Koukis | self.size = int(size) |
111 | 45e32a00 | Vangelis Koukis | assert size >= 1 |
112 | 45e32a00 | Vangelis Koukis | except:
|
113 | 45e32a00 | Vangelis Koukis | raise ValueError("Invalid size for pool (positive integer " |
114 | 45e32a00 | Vangelis Koukis | "required): %r" % (size,))
|
115 | 45e32a00 | Vangelis Koukis | |
116 | a8935947 | Georgios D. Tsoukalas | self._semaphore = Semaphore(size) # Pool grows up to size limit |
117 | 45e32a00 | Vangelis Koukis | self._mutex = Lock() # Protect shared _set oject |
118 | 45e32a00 | Vangelis Koukis | self._set = set() |
119 | 68453d22 | Christos Stavrakakis | log.debug("Initialized pool %r", self) |
120 | 68453d22 | Christos Stavrakakis | |
121 | 68453d22 | Christos Stavrakakis | def __repr__(self): |
122 | 68453d22 | Christos Stavrakakis | return ("<pool %d: size=%d, len(_set)=%d, semaphore=%d>" % |
123 | 68453d22 | Christos Stavrakakis | (id(self), self.size, len(self._set), |
124 | 68453d22 | Christos Stavrakakis | self._semaphore._Semaphore__value))
|
125 | 45e32a00 | Vangelis Koukis | |
126 | a8935947 | Georgios D. Tsoukalas | def pool_get(self, blocking=True, timeout=None, create=True, verify=True): |
127 | 45e32a00 | Vangelis Koukis | """Get an object from the pool.
|
128 | 45e32a00 | Vangelis Koukis |
|
129 | a8935947 | Georgios D. Tsoukalas | Get a pool allocation and an object from the pool set.
|
130 | a8935947 | Georgios D. Tsoukalas | Raise PoolLimitError if the pool allocation limit has been reached.
|
131 | a8935947 | Georgios D. Tsoukalas | If the pool set is empty, create a new object (create==True),
|
132 | a8935947 | Georgios D. Tsoukalas | or return None (create==False) and let the caller create it.
|
133 | a8935947 | Georgios D. Tsoukalas | All objects returned (except None) are verified.
|
134 | 45e32a00 | Vangelis Koukis |
|
135 | 45e32a00 | Vangelis Koukis | """
|
136 | de67123e | Vangelis Koukis | # timeout argument only supported by gevent and py3k variants
|
137 | de67123e | Vangelis Koukis | # of Semaphore. acquire() will raise TypeError if timeout
|
138 | de67123e | Vangelis Koukis | # is specified but not supported by the underlying implementation.
|
139 | 68453d22 | Christos Stavrakakis | log.debug("GET: about to get object from pool %r", self) |
140 | de67123e | Vangelis Koukis | kw = {"blocking": blocking}
|
141 | de67123e | Vangelis Koukis | if timeout is not None: |
142 | de67123e | Vangelis Koukis | kw["timeout"] = timeout
|
143 | a8935947 | Georgios D. Tsoukalas | sema = self._semaphore
|
144 | a8935947 | Georgios D. Tsoukalas | r = sema.acquire(**kw) |
145 | 45e32a00 | Vangelis Koukis | if not r: |
146 | a8935947 | Georgios D. Tsoukalas | raise PoolLimitError()
|
147 | a8935947 | Georgios D. Tsoukalas | |
148 | a8935947 | Georgios D. Tsoukalas | try:
|
149 | a8935947 | Georgios D. Tsoukalas | created = 0
|
150 | a8935947 | Georgios D. Tsoukalas | while 1: |
151 | a8935947 | Georgios D. Tsoukalas | with self._mutex: |
152 | a8935947 | Georgios D. Tsoukalas | try:
|
153 | a8935947 | Georgios D. Tsoukalas | obj = self._set.pop()
|
154 | a8935947 | Georgios D. Tsoukalas | except KeyError: |
155 | a8935947 | Georgios D. Tsoukalas | obj = None
|
156 | a8935947 | Georgios D. Tsoukalas | if obj is None and create: |
157 | a8935947 | Georgios D. Tsoukalas | obj = self._pool_create()
|
158 | a8935947 | Georgios D. Tsoukalas | created = 1
|
159 | a8935947 | Georgios D. Tsoukalas | |
160 | a8935947 | Georgios D. Tsoukalas | if not self._pool_verify(obj): |
161 | a8935947 | Georgios D. Tsoukalas | if created:
|
162 | a8935947 | Georgios D. Tsoukalas | m = "Pool %r cannot verify new object %r" % (self, obj) |
163 | a8935947 | Georgios D. Tsoukalas | raise PoolVerificationError(m)
|
164 | a8935947 | Georgios D. Tsoukalas | continue
|
165 | a8935947 | Georgios D. Tsoukalas | break
|
166 | a8935947 | Georgios D. Tsoukalas | except:
|
167 | a8935947 | Georgios D. Tsoukalas | sema.release() |
168 | a8935947 | Georgios D. Tsoukalas | raise
|
169 | a8935947 | Georgios D. Tsoukalas | |
170 | a8935947 | Georgios D. Tsoukalas | # We keep _semaphore acquired, put() will release it
|
171 | 68453d22 | Christos Stavrakakis | log.debug("GOT: object %r from pool %r", obj, self) |
172 | 45e32a00 | Vangelis Koukis | return obj
|
173 | 45e32a00 | Vangelis Koukis | |
174 | 09cdd926 | Vangelis Koukis | def pool_put(self, obj): |
175 | 45e32a00 | Vangelis Koukis | """Put an object back into the pool.
|
176 | 45e32a00 | Vangelis Koukis |
|
177 | a8935947 | Georgios D. Tsoukalas | Release an allocation and return an object to the pool.
|
178 | a8935947 | Georgios D. Tsoukalas | If obj is None, or _pool_cleanup returns True,
|
179 | a8935947 | Georgios D. Tsoukalas | then the allocation is released,
|
180 | a8935947 | Georgios D. Tsoukalas | but no object returned to the pool set
|
181 | 45e32a00 | Vangelis Koukis |
|
182 | 45e32a00 | Vangelis Koukis | """
|
183 | 68453d22 | Christos Stavrakakis | log.debug("PUT-BEFORE: about to put object %r back to pool %r", obj, self) |
184 | a8935947 | Georgios D. Tsoukalas | if obj is not None and not self._pool_cleanup(obj): |
185 | e66c7993 | Georgios D. Tsoukalas | with self._mutex: |
186 | 3447b13d | Vangelis Koukis | self._set.add(obj)
|
187 | 45e32a00 | Vangelis Koukis | self._semaphore.release()
|
188 | 68453d22 | Christos Stavrakakis | log.debug("PUT-AFTER: finished putting object %r back to pool %r", obj, self) |
189 | 45e32a00 | Vangelis Koukis | |
190 | 09cdd926 | Vangelis Koukis | def _pool_create(self): |
191 | 45e32a00 | Vangelis Koukis | """Create a new object to be used with this pool.
|
192 | 45e32a00 | Vangelis Koukis |
|
193 | 45e32a00 | Vangelis Koukis | Create a new object to be used with this pool,
|
194 | 45e32a00 | Vangelis Koukis | should be overriden in subclasses.
|
195 | a8935947 | Georgios D. Tsoukalas | Must be thread-safe.
|
196 | a8935947 | Georgios D. Tsoukalas | """
|
197 | a8935947 | Georgios D. Tsoukalas | raise NotImplementedError |
198 | a8935947 | Georgios D. Tsoukalas | |
199 | a8935947 | Georgios D. Tsoukalas | def _pool_verify(self, obj): |
200 | a8935947 | Georgios D. Tsoukalas | """Verify an object after getting it from the pool.
|
201 | a8935947 | Georgios D. Tsoukalas |
|
202 | a8935947 | Georgios D. Tsoukalas | If it returns False, the object is discarded
|
203 | a8935947 | Georgios D. Tsoukalas | and another one is drawn from the pool.
|
204 | a8935947 | Georgios D. Tsoukalas | If the pool is empty, a new object is created.
|
205 | a8935947 | Georgios D. Tsoukalas | If the new object fails to verify, pool_get() will fail.
|
206 | a8935947 | Georgios D. Tsoukalas | Must be thread-safe.
|
207 | 45e32a00 | Vangelis Koukis |
|
208 | 45e32a00 | Vangelis Koukis | """
|
209 | 45e32a00 | Vangelis Koukis | raise NotImplementedError |
210 | 45e32a00 | Vangelis Koukis | |
211 | 09cdd926 | Vangelis Koukis | def _pool_cleanup(self, obj): |
212 | 45e32a00 | Vangelis Koukis | """Cleanup an object before being put back into the pool.
|
213 | 45e32a00 | Vangelis Koukis |
|
214 | 45e32a00 | Vangelis Koukis | Cleanup an object before it can be put back into the pull,
|
215 | 45e32a00 | Vangelis Koukis | ensure it is in a stable, reusable state.
|
216 | a8935947 | Georgios D. Tsoukalas | Must be thread-safe.
|
217 | 45e32a00 | Vangelis Koukis |
|
218 | 45e32a00 | Vangelis Koukis | """
|
219 | 45e32a00 | Vangelis Koukis | raise NotImplementedError |