Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / pool / __init__.py @ 4ab1af1a

History | View | Annotate | Download (15.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
"""Classes to support pools of arbitrary objects.
36

37
The :class:`ObjectPool` class in this module abstracts a pool
38
of arbitrary objects. Subclasses need to define the details regarding
39
creation, destruction, allocation and release of their specific objects.
40

41
"""
42

    
43
# This should work under gevent, because gevent monkey patches 'threading'
44
# if not, we can detect if running under gevent, e.g. using
45
# if 'gevent' in sys.modules:
46
#     from gevent.coros import Semaphore
47
# else:
48
#     from threading import Semaphore
49
from threading import Semaphore, Lock
50

    
51

    
52
__all__ = ['ObjectPool', 'ObjectPoolError',
53
           'PoolLimitError', 'PoolVerificationError']
54

    
55
import logging
56
log = logging.getLogger(__name__)
57

    
58

    
59
class ObjectPoolError(Exception):
60
    pass
61

    
62

    
63
class PoolLimitError(ObjectPoolError):
64
    pass
65

    
66

    
67
class PoolVerificationError(ObjectPoolError):
68
    pass
69

    
70

    
71
class ObjectPool(object):
72
    """Generic Object Pool.
73

74
    The pool consists of an object set and an allocation semaphore.
75

76
    pool_get() gets an allocation from the semaphore
77
               and an object from the pool set.
78

79
    pool_put() releases an allocation to the semaphore
80
               and puts an object back to the pool set.
81

82
    Subclasses must implement these thread-safe hooks:
83
    _pool_create()
84
            used as a subclass hook to auto-create new objects in pool_get().
85
    _pool_verify()
86
            verifies objects before they are returned by pool_get()
87
    _pool_cleanup()
88
            cleans up and verifies objects before their return by pool_put().
89

90
    While allocations are strictly accounted for and limited by
91
    the semaphore, objects are expendable:
92

93
    The hook provider and the caller are solely responsible for object
94
    handling.
95

96
    pool_get() may create an object if there is none in the pool set.
97
    pool_get() may return no object, leaving object creation to the caller.
98
    pool_put() may return no object to the pool set.
99
    Objects to pool_put() to the pool need not be those from pool_get().
100
    Objects to pool_get() need not be those from pool_put().
101

102

103
    Callers beware:
104
    The pool limit size must be greater than the total working set of objects,
105
    otherwise it will hang. When in doubt, use an impossibly large size limit.
106
    Since the pool grows on demand, this will not waste resources.
107
    However, in that case, the pool must not be used as a flow control device
108
    (i.e. relying on pool_get() blocking to stop threads),
109
    as the impossibly large pool size limit will defer blocking until too late.
110

111
    """
112
    def __init__(self, size=None):
113
        try:
114
            self.size = int(size)
115
            assert size >= 1
116
        except:
117
            raise ValueError("Invalid size for pool (positive integer "
118
                             "required): %r" % (size,))
119

    
120
        self._semaphore = Semaphore(size)  # Pool grows up to size limit
121
        self._mutex = Lock()  # Protect shared _set oject
122
        self._set = set()
123
        log.debug("Initialized pool %r", self)
124

    
125
    def __repr__(self):
126
        return ("<pool %d: size=%d, len(_set)=%d, semaphore=%d>" %
127
                (id(self), self.size, len(self._set),
128
                 self._semaphore._Semaphore__value))
129

    
130
    def pool_get(self, blocking=True, timeout=None, create=True, verify=True):
131
        """Get an object from the pool.
132

133
        Get a pool allocation and an object from the pool set.
134
        Raise PoolLimitError if the pool allocation limit has been reached.
135
        If the pool set is empty, create a new object (create==True),
136
        or return None (create==False) and let the caller create it.
137
        All objects returned (except None) are verified.
138

139
        """
140
        # timeout argument only supported by gevent and py3k variants
141
        # of Semaphore. acquire() will raise TypeError if timeout
142
        # is specified but not supported by the underlying implementation.
143
        log.debug("GET: about to get object from pool %r", self)
144
        kw = {"blocking": blocking}
145
        if timeout is not None:
146
            kw["timeout"] = timeout
147
        sema = self._semaphore
148
        r = sema.acquire(**kw)
149
        if not r:
150
            raise PoolLimitError()
151

    
152
        try:
153
            created = 0
154
            while 1:
155
                with self._mutex:
156
                    try:
157
                        obj = self._set.pop()
158
                    except KeyError:
159
                        obj = None
160
                if obj is None and create:
161
                    obj = self._pool_create()
162
                    created = 1
163

    
164
                if not self._pool_verify(obj):
165
                    if created:
166
                        m = "Pool %r cannot verify new object %r" % (self, obj)
167
                        raise PoolVerificationError(m)
168
                    continue
169
                break
170
        except:
171
            sema.release()
172
            raise
173

    
174
        # We keep _semaphore acquired, put() will release it
175
        log.debug("GOT: object %r from pool %r", obj, self)
176
        return obj
177

    
178
    def pool_put(self, obj):
179
        """Put an object back into the pool.
180

181
        Release an allocation and return an object to the pool.
182
        If obj is None, or _pool_cleanup returns True,
183
        then the allocation is released,
184
        but no object returned to the pool set
185

186
        """
187
        log.debug("PUT-BEFORE: about to put object %r back to pool %r",
188
                  obj, self)
189
        if obj is not None and not self._pool_cleanup(obj):
190
            with self._mutex:
191
                if obj in self._set:
192
                    log.warning("Object %r already in _set of pool %r",
193
                                obj, self)
194
                self._set.add(obj)
195
        self._semaphore.release()
196
        log.debug("PUT-AFTER: finished putting object %r back to pool %r",
197
                  obj, self)
198

    
199
    def pool_create_free(self):
200
        """Create a free new object that is not put into the pool.
201

202
        Just for convenience, let the users create objects with
203
        the exact same configuration as those that are used with the pool
204

205
        """
206
        obj = self._pool_create_free()
207
        return obj
208

    
209
    def _pool_create_free(self):
210
        """Create a free new object that is not put into the pool.
211

212
        This should be overriden by pool classes.
213
        Otherwise, it just calls _pool_create().
214

215
        """
216
        return self._pool_create()
217

    
218
    def _pool_create(self):
219
        """Create a new object to be used with this pool.
220

221
        Create a new object to be used with this pool,
222
        should be overriden in subclasses.
223
        Must be thread-safe.
224

225
        """
226
        raise NotImplementedError
227

    
228
    def _pool_verify(self, obj):
229
        """Verify an object after getting it from the pool.
230

231
        If it returns False, the object is discarded
232
        and another one is drawn from the pool.
233
        If the pool is empty, a new object is created.
234
        If the new object fails to verify, pool_get() will fail.
235
        Must be thread-safe.
236

237
        """
238
        raise NotImplementedError
239

    
240
    def _pool_cleanup(self, obj):
241
        """Cleanup an object before being put back into the pool.
242

243
        Cleanup an object before it can be put back into the pull,
244
        ensure it is in a stable, reusable state.
245
        Must be thread-safe.
246

247
        """
248
        raise NotImplementedError
249

    
250

    
251
class PooledObject(object):
252
    """Generic Object Pool context manager and pooled object proxy.
253

254
    The PooledObject instance acts as a context manager to
255
    be used in a with statement:
256

257
        with PooledObject(...) as obj:
258
            use(obj)
259

260
    The with block above is roughly equivalent to:
261

262
        pooled = PooledObject(...):
263
        try:
264
            obj = pooled.acquire()
265
            assert(obj is pooled.obj)
266
            use(obj)
267
        finally:
268
            pooled.release()
269

270
    After exiting the with block, or releasing,
271
    the code MUST not use the obj again in any way.
272

273
    """
274

    
275
    # NOTE: We need all definitions at class-level
276
    # to avoid infinite __gettatr__() recursion.
277
    # This is also true for subclasses.
278

    
279
    # NOTE: Typically you will only need to override
280
    #       __init__() and get_pool
281

    
282
    # Initialization. Do not customize.
283
    _pool_settings = None
284
    _pool_get_settings = None
285
    _pool_kwargs = None
286
    _pool = None
287
    obj = None
288

    
289
    #####################################################
290
    ### Subclass attribute customization begins here. ###
291

    
292
    _pool_log_prefix = "POOL"
293
    _pool_class = ObjectPool
294

    
295
    # default keyword args to pass to pool initialization
296
    _pool_default_settings = (
297
            ('size', 25),
298
        )
299

    
300
    # keyword args to pass to pool_get
301
    _pool_default_get_settings = (
302
            ('blocking', True),
303
            #('timeout', None),
304
            ('create', True),
305
            ('verify', True),
306
        )
307

    
308
    # behavior settings
309
    _pool_attach_context = False
310
    _pool_disable_after_release = True
311
    _pool_ignore_double_release = False
312

    
313
    ###  Subclass attribute customization ends here.  ###
314
    #####################################################
315

    
316
    def __init__(self, pool_settings=None,
317
                       get_settings=None,
318
                       attach_context=None,
319
                       disable_after_release=None,
320
                       ignore_double_release=None,
321
                       **kwargs):
322
        """Initialize a PooledObject instance.
323

324
        Accept only keyword arguments.
325
        Some of them are filtered for this instance's configuration,
326
        and the rest are saved in ._pool_kwargs for later use.
327

328
        The filtered keywords are:
329

330
        pool_settings:  keyword args forwarded to pool instance initialization
331
                        in get_pool(), on top of the class defaults.
332
                        If not given, the remaining keyword args are
333
                        forwarded instead.
334

335
        get_settings:   keyword args forwarded to the pool's .pool_get() on top
336
                        of the class defaults.
337

338
        attach_context: boolean overriding the class default.
339
                        If True, after getting an object from the pool,
340
                        attach self onto it before returning it,
341
                        so that the context manager caller can have
342
                        access to the manager object within the with: block.
343

344
        disable_after_release:
345
                        boolean overriding the class default.
346
                        If True, the PooledObject will not allow a second
347
                        acquisition after the first release. For example,
348
                        the second with will raise an AssertionError:
349
                        manager = PooledObject()
350
                        with manager as c:
351
                            pass
352
                        with manager as c:
353
                            pass
354

355
        ignore_double_release:
356
                        boolean overriding the class default.
357
                        If True, the PooledObject will allow consecutive
358
                        calls to release the underlying pooled object.
359
                        Only the first one has an effect.
360
                        If False, an AssertionError is raised.
361

362
        """
363
        self._pool_kwargs = kwargs
364
        self._pool = None
365
        self.obj = None
366

    
367
        _get_settings = dict(self._pool_default_get_settings)
368
        if get_settings is not None:
369
            _get_settings.update(get_settings)
370
        self._pool_get_settings = _get_settings
371

    
372
        if attach_context is not None:
373
            self._pool_attach_context = attach_context
374

    
375
        if pool_settings is None:
376
            pool_settings = kwargs
377

    
378
        _pool_settings = dict(self._pool_default_settings)
379
        _pool_settings.update(**pool_settings)
380
        self._pool_settings = _pool_settings
381

    
382
    def get_pool(self):
383
        """Return a suitable pool object to work with.
384

385
        Called within .acquire(), it is meant to be
386
        overriden by sublasses, to create a new pool,
387
        or retrieve an existing one, based on the PooledObject
388
        initialization keywords stored in self._pool_kwargs.
389

390
        """
391
        pool = self._pool_class(**self._pool_settings)
392
        return pool
393

    
394
    ### Maybe overriding get_pool() and __init__() above is enough ###
395

    
396
    def __repr__(self):
397
        return ("<object %s of class %s: "
398
                "proxy for object (%r) in pool (%r)>" % (
399
                id(self), self.__class__.__name__,
400
                self.obj, self._pool))
401

    
402
    __str__ = __repr__
403

    
404
    ## Proxy the real object. Disabled until needed.
405
    ##
406
    ##def __getattr__(self, name):
407
    ##    return getattr(self.obj, name)
408

    
409
    ##def __setattr__(self, name, value):
410
    ##    if hasattr(self, name):
411
    ##        _setattr = super(PooledObject, self).__setattr__
412
    ##        _setattr(name, value)
413
    ##    else:
414
    ##        setattr(self.obj, name, value)
415

    
416
    ##def __delattr_(self, name):
417
    ##    _delattr = super(PooledObject, self).__delattr__
418
    ##    if hasattr(self, name):
419
    ##        _delattr(self, name)
420
    ##    else:
421
    ##        delattr(self.obj, name)
422

    
423
    def __enter__(self):
424
        return self.acquire()
425

    
426
    def __exit__(self, exc_type, exc_value, trace):
427
        return self.release()
428

    
429
    def acquire(self):
430
        log.debug("%s Acquiring (context: %r)", self._pool_log_prefix, self)
431
        pool = self._pool
432
        if pool is False:
433
            m = "%r: has been released. No further pool access allowed." % (
434
                self,)
435
            raise AssertionError(m)
436
        if pool is not None:
437
            m = "Double acquire in %r" % self
438
            raise AssertionError(m)
439

    
440
        pool = self.get_pool()
441
        self._pool = pool
442

    
443
        obj = pool.pool_get(**self._pool_get_settings)
444
        if self._pool_attach_context:
445
            obj._pool_context = self
446
        self.obj = obj
447
        log.debug("%s Acquired %r", self._pool_log_prefix, obj)
448
        return obj
449

    
450
    def release(self):
451
        log.debug("%s Releasing (context: %r)", self._pool_log_prefix, self)
452
        pool = self._pool
453
        if pool is None:
454
            m = "%r: no pool" % (self,)
455
            raise AssertionError(m)
456

    
457
        obj = self.obj
458
        if obj is None:
459
            if self._pool_ignore_double_release:
460
                return
461
            m = "%r: no object. Double release?" % (self,)
462
            raise AssertionError(m)
463

    
464
        pool.pool_put(obj)
465
        self.obj = None
466
        if self._pool_disable_after_release:
467
            self._pool = False