Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / pool / __init__.py @ 5607fc6c

History | View | Annotate | Download (15.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
"""Classes to support pools of arbitrary objects.
36

37
The :class:`ObjectPool` class in this module abstracts a pool
38
of arbitrary objects. Subclasses need to define the details regarding
39
creation, destruction, allocation and release of their specific objects.
40

41
"""
42

    
43
# This should work under gevent, because gevent monkey patches 'threading'
44
# if not, we can detect if running under gevent, e.g. using
45
# if 'gevent' in sys.modules:
46
#     from gevent.coros import Semaphore
47
# else:
48
#     from threading import Semaphore
49
from threading import Semaphore, Lock
50

    
51

    
52
__all__ = ['ObjectPool', 'ObjectPoolError',
53
           'PoolLimitError', 'PoolVerificationError']
54

    
55
import logging
56
log = logging.getLogger(__name__)
57

    
58

    
59
class ObjectPoolError(Exception):
60
    pass
61

    
62

    
63
class PoolLimitError(ObjectPoolError):
64
    pass
65

    
66

    
67
class PoolVerificationError(ObjectPoolError):
68
    pass
69

    
70

    
71
class ObjectPool(object):
72
    """Generic Object Pool.
73

74
    The pool consists of an object set and an allocation semaphore.
75

76
    pool_get() gets an allocation from the semaphore
77
               and an object from the pool set.
78

79
    pool_put() releases an allocation to the semaphore
80
               and puts an object back to the pool set.
81

82
    Subclasses must implement these thread-safe hooks:
83
    _pool_create()
84
            used as a subclass hook to auto-create new objects in pool_get().
85
    _pool_verify()
86
            verifies objects before they are returned by pool_get()
87
    _pool_cleanup()
88
            cleans up and verifies objects before their return by pool_put().
89

90
    While allocations are strictly accounted for and limited by
91
    the semaphore, objects are expendable:
92

93
    The hook provider and the caller are solely responsible for object
94
    handling.
95

96
    pool_get() may create an object if there is none in the pool set.
97
    pool_get() may return no object, leaving object creation to the caller.
98
    pool_put() may return no object to the pool set.
99
    Objects to pool_put() to the pool need not be those from pool_get().
100
    Objects to pool_get() need not be those from pool_put().
101

102

103
    Callers beware:
104
    The pool limit size must be greater than the total working set of objects,
105
    otherwise it will hang. When in doubt, use an impossibly large size limit.
106
    Since the pool grows on demand, this will not waste resources.
107
    However, in that case, the pool must not be used as a flow control device
108
    (i.e. relying on pool_get() blocking to stop threads),
109
    as the impossibly large pool size limit will defer blocking until too late.
110

111
    """
112
    def __init__(self, size=None):
113
        try:
114
            self.size = int(size)
115
            assert size >= 1
116
        except:
117
            raise ValueError("Invalid size for pool (positive integer "
118
                             "required): %r" % (size,))
119

    
120
        self._semaphore = Semaphore(size)  # Pool grows up to size limit
121
        self._mutex = Lock()  # Protect shared _set oject
122
        self._set = set()
123
        log.debug("Initialized pool %r", self)
124

    
125
    def __repr__(self):
126
        return ("<pool %d: size=%d, len(_set)=%d, semaphore=%d>" %
127
                (id(self), self.size, len(self._set),
128
                 self._semaphore._Semaphore__value))
129

    
130
    def pool_get(self, blocking=True, timeout=None, create=True, verify=True):
131
        """Get an object from the pool.
132

133
        Get a pool allocation and an object from the pool set.
134
        Raise PoolLimitError if the pool allocation limit has been reached.
135
        If the pool set is empty, create a new object (create==True),
136
        or return None (create==False) and let the caller create it.
137
        All objects returned (except None) are verified.
138

139
        """
140
        # timeout argument only supported by gevent and py3k variants
141
        # of Semaphore. acquire() will raise TypeError if timeout
142
        # is specified but not supported by the underlying implementation.
143
        log.debug("GET: about to get object from pool %r", self)
144
        kw = {"blocking": blocking}
145
        if timeout is not None:
146
            kw["timeout"] = timeout
147
        sema = self._semaphore
148
        r = sema.acquire(**kw)
149
        if not r:
150
            raise PoolLimitError()
151

    
152
        try:
153
            created = 0
154
            while 1:
155
                with self._mutex:
156
                    try:
157
                        obj = self._set.pop()
158
                    except KeyError:
159
                        obj = None
160
                if obj is None and create:
161
                    obj = self._pool_create()
162
                    created = 1
163

    
164
                if not self._pool_verify(obj):
165
                    if created:
166
                        m = "Pool %r cannot verify new object %r" % (self, obj)
167
                        raise PoolVerificationError(m)
168
                    continue
169
                break
170
        except:
171
            sema.release()
172
            raise
173

    
174
        # We keep _semaphore acquired, put() will release it
175
        log.debug("GOT: object %r from pool %r", obj, self)
176
        return obj
177

    
178
    def pool_put(self, obj):
179
        """Put an object back into the pool.
180

181
        Release an allocation and return an object to the pool.
182
        If obj is None, or _pool_cleanup returns True,
183
        then the allocation is released,
184
        but no object returned to the pool set
185

186
        """
187
        log.debug("PUT-BEFORE: about to put object %r back to pool %r",
188
                  obj, self)
189
        if obj is not None and not self._pool_cleanup(obj):
190
            with self._mutex:
191
                if obj in self._set:
192
                    log.warning("Object %r already in _set of pool %r",
193
                                obj, self)
194
                self._set.add(obj)
195
        self._semaphore.release()
196
        log.debug("PUT-AFTER: finished putting object %r back to pool %r",
197
                  obj, self)
198

    
199
    def pool_create_free(self):
200
        """Create a free new object that is not put into the pool.
201

202
        Just for convenience, let the users create objects with
203
        the exact same configuration as those that are used with the pool
204

205
        """
206
        obj = self._pool_create_free()
207
        return obj
208

    
209
    def _pool_create_free(self):
210
        """Create a free new object that is not put into the pool.
211

212
        This should be overriden by pool classes.
213
        Otherwise, it just calls _pool_create().
214

215
        """
216
        return self._pool_create()
217

    
218
    def _pool_create(self):
219
        """Create a new object to be used with this pool.
220

221
        Create a new object to be used with this pool,
222
        should be overriden in subclasses.
223
        Must be thread-safe.
224

225
        """
226
        raise NotImplementedError
227

    
228
    def _pool_verify(self, obj):
229
        """Verify an object after getting it from the pool.
230

231
        If it returns False, the object is discarded
232
        and another one is drawn from the pool.
233
        If the pool is empty, a new object is created.
234
        If the new object fails to verify, pool_get() will fail.
235
        Must be thread-safe.
236

237
        """
238
        raise NotImplementedError
239

    
240
    def _pool_cleanup(self, obj):
241
        """Cleanup an object before being put back into the pool.
242

243
        Cleanup an object before it can be put back into the pull,
244
        ensure it is in a stable, reusable state.
245
        Must be thread-safe.
246

247
        """
248
        raise NotImplementedError
249

    
250

    
251
class PooledObject(object):
252
    """Generic Object Pool context manager and pooled object proxy.
253

254
    The PooledObject instance acts as a context manager to
255
    be used in a with statement:
256

257
        with PooledObject(...) as obj:
258
            use(obj)
259

260
    The with block above is roughly equivalent to:
261

262
        pooled = PooledObject(...):
263
        try:
264
            obj = pooled.acquire()
265
            assert(obj is pooled.obj)
266
            use(obj)
267
        finally:
268
            pooled.release()
269

270
    After exiting the with block, or releasing,
271
    the code MUST not use the obj again in any way.
272

273
    """
274

    
275
    # NOTE: We need all definitions at class-level
276
    # to avoid infinite __gettatr__() recursion.
277
    # This is also true for subclasses.
278

    
279
    # NOTE: Typically you will only need to override
280
    #       __init__() and get_pool
281

    
282
    # Initialization. Do not customize.
283
    _pool_settings = None
284
    _pool_get_settings = None
285
    _pool_kwargs = None
286
    _pool = None
287
    obj = None
288

    
289
    #####################################################
290
    ### Subclass attribute customization begins here. ###
291

    
292
    _pool_log_prefix = "POOL"
293
    _pool_class = ObjectPool
294

    
295
    # default keyword args to pass to pool initialization
296
    _pool_default_settings = (
297
        ('size', 25),
298
    )
299

    
300
    # keyword args to pass to pool_get
301
    _pool_default_get_settings = (
302
        ('blocking', True),
303
        #('timeout', None),
304
        ('create', True),
305
        ('verify', True),
306
    )
307

    
308
    # behavior settings
309
    _pool_attach_context = False
310
    _pool_disable_after_release = True
311
    _pool_ignore_double_release = False
312

    
313
    ###  Subclass attribute customization ends here.  ###
314
    #####################################################
315

    
316
    def __init__(self, pool_settings=None, get_settings=None,
317
                 attach_context=None, disable_after_release=None,
318
                 ignore_double_release=None, **kwargs):
319
        """Initialize a PooledObject instance.
320

321
        Accept only keyword arguments.
322
        Some of them are filtered for this instance's configuration,
323
        and the rest are saved in ._pool_kwargs for later use.
324

325
        The filtered keywords are:
326

327
        pool_settings:  keyword args forwarded to pool instance initialization
328
                        in get_pool(), on top of the class defaults.
329
                        If not given, the remaining keyword args are
330
                        forwarded instead.
331

332
        get_settings:   keyword args forwarded to the pool's .pool_get() on top
333
                        of the class defaults.
334

335
        attach_context: boolean overriding the class default.
336
                        If True, after getting an object from the pool,
337
                        attach self onto it before returning it,
338
                        so that the context manager caller can have
339
                        access to the manager object within the with: block.
340

341
        disable_after_release:
342
                        boolean overriding the class default.
343
                        If True, the PooledObject will not allow a second
344
                        acquisition after the first release. For example,
345
                        the second with will raise an AssertionError:
346
                        manager = PooledObject()
347
                        with manager as c:
348
                            pass
349
                        with manager as c:
350
                            pass
351

352
        ignore_double_release:
353
                        boolean overriding the class default.
354
                        If True, the PooledObject will allow consecutive
355
                        calls to release the underlying pooled object.
356
                        Only the first one has an effect.
357
                        If False, an AssertionError is raised.
358

359
        """
360
        self._pool_kwargs = kwargs
361
        self._pool = None
362
        self.obj = None
363

    
364
        _get_settings = dict(self._pool_default_get_settings)
365
        if get_settings is not None:
366
            _get_settings.update(get_settings)
367
        self._pool_get_settings = _get_settings
368

    
369
        if attach_context is not None:
370
            self._pool_attach_context = attach_context
371

    
372
        if pool_settings is None:
373
            pool_settings = kwargs
374

    
375
        _pool_settings = dict(self._pool_default_settings)
376
        _pool_settings.update(**pool_settings)
377
        self._pool_settings = _pool_settings
378

    
379
    def get_pool(self):
380
        """Return a suitable pool object to work with.
381

382
        Called within .acquire(), it is meant to be
383
        overriden by sublasses, to create a new pool,
384
        or retrieve an existing one, based on the PooledObject
385
        initialization keywords stored in self._pool_kwargs.
386

387
        """
388
        pool = self._pool_class(**self._pool_settings)
389
        return pool
390

    
391
    ### Maybe overriding get_pool() and __init__() above is enough ###
392

    
393
    def __repr__(self):
394
        return ("<object %s of class %s: "
395
                "proxy for object (%r) in pool (%r)>" % (
396
                id(self), self.__class__.__name__,
397
                self.obj, self._pool))
398

    
399
    __str__ = __repr__
400

    
401
    ## Proxy the real object. Disabled until needed.
402
    ##
403
    ##def __getattr__(self, name):
404
    ##    return getattr(self.obj, name)
405

    
406
    ##def __setattr__(self, name, value):
407
    ##    if hasattr(self, name):
408
    ##        _setattr = super(PooledObject, self).__setattr__
409
    ##        _setattr(name, value)
410
    ##    else:
411
    ##        setattr(self.obj, name, value)
412

    
413
    ##def __delattr_(self, name):
414
    ##    _delattr = super(PooledObject, self).__delattr__
415
    ##    if hasattr(self, name):
416
    ##        _delattr(self, name)
417
    ##    else:
418
    ##        delattr(self.obj, name)
419

    
420
    def __enter__(self):
421
        return self.acquire()
422

    
423
    def __exit__(self, exc_type, exc_value, trace):
424
        return self.release()
425

    
426
    def acquire(self):
427
        log.debug("%s Acquiring (context: %r)", self._pool_log_prefix, self)
428
        pool = self._pool
429
        if pool is False:
430
            m = "%r: has been released. No further pool access allowed." % (
431
                self,)
432
            raise AssertionError(m)
433
        if pool is not None:
434
            m = "Double acquire in %r" % self
435
            raise AssertionError(m)
436

    
437
        pool = self.get_pool()
438
        self._pool = pool
439

    
440
        obj = pool.pool_get(**self._pool_get_settings)
441
        if self._pool_attach_context:
442
            obj._pool_context = self
443
        self.obj = obj
444
        log.debug("%s Acquired %r", self._pool_log_prefix, obj)
445
        return obj
446

    
447
    def release(self):
448
        log.debug("%s Releasing (context: %r)", self._pool_log_prefix, self)
449
        pool = self._pool
450
        if pool is None:
451
            m = "%r: no pool" % (self,)
452
            raise AssertionError(m)
453

    
454
        obj = self.obj
455
        if obj is None:
456
            if self._pool_ignore_double_release:
457
                return
458
            m = "%r: no object. Double release?" % (self,)
459
            raise AssertionError(m)
460

    
461
        pool.pool_put(obj)
462
        self.obj = None
463
        if self._pool_disable_after_release:
464
            self._pool = False