Pool connections
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Mon, 10 Sep 2012 09:18:29 +0000 (12:18 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Mon, 10 Sep 2012 09:18:29 +0000 (12:18 +0300)
Requests also block if pool is full (used to be just for
responses)

kamaki/cli/commands/pithos_cli.py
kamaki/clients/connection/kamakicon.py [new file with mode: 0644]
kamaki/clients/connection/pool/__init__.py [new file with mode: 0644]
kamaki/clients/connection/pool/http.py [new file with mode: 0644]
kamaki/clients/connection/pool/tests.py [new file with mode: 0755]
kamaki/clients/connection/request.py

index de84e79..fa16a83 100644 (file)
@@ -557,6 +557,7 @@ class store_upload(_store_container_command):
                     public=getattr(self.args, 'public'))
         except ClientError as err:
             raiseCLIError(err)
+        print 'Upload completed'
 
 @command()
 class store_download(_store_container_command):
diff --git a/kamaki/clients/connection/kamakicon.py b/kamaki/clients/connection/kamakicon.py
new file mode 100644 (file)
index 0000000..21626e8
--- /dev/null
@@ -0,0 +1,143 @@
+# Copyright 2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+#   1. Redistributions of source code must retain the above
+#      copyright notice, self.list of conditions and the following
+#      disclaimer.
+#
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, self.list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from urlparse import urlparse
+from .pool.http import get_http_connection
+from . import HTTPConnection, HTTPResponse, HTTPConnectionError
+
+
+from time import sleep
+from httplib import ResponseNotReady
+
+class KamakiHTTPResponse(HTTPResponse):
+
+    def _get_response(self):
+        print('KamakiHTTPResponse:should I get response?')
+        if self.prefetched:
+            print('\tKamakiHTTPResponse: no, I have already done that before')
+            return
+        print('\tKamakiHTTPResponse: yes, pls')
+        r = self.request.getresponse()
+        self.prefetched = True
+        headers = {}
+        for k,v in r.getheaders():
+            headers.update({k:v})
+        self.headers = headers
+        self.content = r.read(r.length)
+        self.status_code = r.status
+        self.status = r.reason
+        print('KamakiHTTPResponse: Niiiiice')
+
+    @property 
+    def text(self):
+        _get_response()
+        return self._content
+    @text.setter
+    def test(self, v):
+        pass
+
+    @property 
+    def json(self):
+        _get_response()
+        from json import loads
+        try:
+            return loads(self._content)
+        except ValueError as err:
+            HTTPConnectionError('Response not formated in JSON', details=unicode(err), status=702)
+    @json.setter
+    def json(self, v):
+        pass
+
+class KamakiHTTPConnection(HTTPConnection):
+
+    url         =   None
+    scheme      =   None
+    netloc      =   None
+    method      =   None
+    data        =   None
+    headers     =   None
+
+    scheme_ports = {
+            'http':     '80',
+            'https':    '443',
+    }
+
+    def _load_connection_settings(self, url=None, scheme=None, params=None, headers=None, host=None,
+        port=None, method=None):
+        if params is not None:
+            self.params = params
+        if headers is not None:
+            self.headers = headers
+
+        if url is None:
+            url = self.url
+        if host is None or scheme is None:
+            p = urlparse(url)
+            netloc = p.netloc
+            if not netloc:
+                netloc = 'localhost'
+            scheme = p.scheme
+            if not scheme:
+                scheme = 'http'
+            param_str = ''
+            for i,(key, val) in enumerate(self.params.items()):
+                param_str = ('?' if i == 0 else '&') + unicode(key) 
+                if val is not None:
+                    param_str+= '='+unicode(val)
+            url = p.path + param_str
+        else:
+            host = host
+            port = port if port is not None else self.scheme_ports[scheme]
+            #NOTE: we force host:port as canonical form,
+            #      lest we have a cache miss 'host' vs 'host:80'
+            netloc = "%s%s" % (host, port)
+
+        self.netloc = netloc
+        self.url = url #if url in (None, '') or url[0] != '/' else url[1:]
+        self.scheme = scheme
+
+        if method is not None:
+            self.method = method
+
+    def perform_request(self, url=None, params=None, headers=None, method=None, host=None,
+        port=None, data=None):
+        self._load_connection_settings(url=url, params=params, headers=headers, host=host,
+            port=port, method=method)
+        print('---> %s %s %s %s %s'%(self.method, self.scheme, self.netloc, self.url, self.headers))
+        conn = get_http_connection(netloc=self.netloc, scheme=self.scheme)
+        try:
+            conn.request(self.method, self.url, headers=self.headers, body=data)
+        except:
+            conn.close()
+            raise
+        return KamakiHTTPResponse(conn)
diff --git a/kamaki/clients/connection/pool/__init__.py b/kamaki/clients/connection/pool/__init__.py
new file mode 100644 (file)
index 0000000..1b90938
--- /dev/null
@@ -0,0 +1,135 @@
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+#
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+
+"""Classes to support pools of arbitrary objects.
+
+The :class:`ObjectPool` class in this module abstracts a pool
+of arbitrary objects. Subclasses need to define the details regarding
+creation, destruction, allocation and release of their specific objects.
+
+"""
+
+# This should work under gevent, because gevent monkey patches 'threading'
+# if not, we can detect if running under gevent, e.g. using
+# if 'gevent' in sys.modules:
+#     from gevent.coros import Semaphore
+# else:
+#     from threading import Semaphore
+from threading import Semaphore, Lock
+
+
+__all__ = ['ObjectPool', 'ObjectPoolError', 'PoolEmptyError']
+
+
+class ObjectPoolError(Exception):
+    pass
+
+
+class PoolEmptyError(ObjectPoolError):
+    pass
+
+
+class ObjectPool(object):
+    def __init__(self, size=None):
+        try:
+            self.size = int(size)
+            assert size >= 1
+        except:
+            raise ValueError("Invalid size for pool (positive integer "
+                             "required): %r" % (size,))
+
+        self._semaphore = Semaphore(size)  # Pool grows up to size
+        self._mutex = Lock()  # Protect shared _set oject
+        self._set = set()
+
+    def pool_get(self, blocking=True, timeout=None, create=True):
+        """Get an object from the pool.
+
+        Get an object from the pool. By default (create=True), create a new
+        object if the pool has not reached its maximum size yet. If
+        create == False, the caller is responsible for creating the object and
+        put()ting it back into the pool when done.
+
+        """
+        # timeout argument only supported by gevent and py3k variants
+        # of Semaphore. acquire() will raise TypeError if timeout
+        # is specified but not supported by the underlying implementation.
+        kw = {"blocking": blocking}
+        if timeout is not None:
+            kw["timeout"] = timeout
+        r = self._semaphore.acquire(**kw)
+        if not r:
+            raise PoolEmptyError()
+        with self._mutex:
+            try:
+                try:
+                    obj = self._set.pop()
+                except KeyError:
+                    obj = self._pool_create() if create else None
+            except:
+                self._semaphore.release()
+                raise
+        # We keep _semaphore locked, put() will release it
+        return obj
+
+    def pool_put(self, obj):
+        """Put an object back into the pool.
+
+        Return an object to the pool, for subsequent retrieval
+        by pool_get() calls. If _pool_cleanup() returns True,
+        the object has died and is not put back into self._set.
+
+        """
+        with self._mutex:
+            if not self._pool_cleanup(obj):
+                self._set.add(obj)
+        self._semaphore.release()
+
+    def _pool_create(self):
+        """Create a new object to be used with this pool.
+
+        Create a new object to be used with this pool,
+        should be overriden in subclasses.
+
+        """
+        raise NotImplementedError
+
+    def _pool_cleanup(self, obj):
+        """Cleanup an object before being put back into the pool.
+
+        Cleanup an object before it can be put back into the pull,
+        ensure it is in a stable, reusable state.
+
+        """
+        raise NotImplementedError
diff --git a/kamaki/clients/connection/pool/http.py b/kamaki/clients/connection/pool/http.py
new file mode 100644 (file)
index 0000000..2992956
--- /dev/null
@@ -0,0 +1,122 @@
+
+from kamaki.clients.connection.pool import ObjectPool
+
+from httplib import (
+        HTTPConnection  as http_class,
+        HTTPSConnection as https_class,
+        HTTPException,
+        ResponseNotReady
+)
+
+from urlparse import urlparse
+from new import instancemethod
+
+
+_pools = {}
+pool_size = 8
+
+
+USAGE_LIMIT = 25
+RETRY_LIMIT = 100
+
+
+def init_http_pooling(size):
+    global pool_size
+    pool_size = size
+
+
+def put_http_connection(conn):
+    pool = conn._pool
+    if pool is None:
+        return
+    conn._pool = None
+    pool.pool_put(conn)
+
+
+class HTTPConnectionPool(ObjectPool):
+
+    _scheme_to_class = {
+            'http'  :   http_class,
+            'https' :   https_class,
+    }
+
+    def __init__(self, scheme, netloc, size=None):
+        ObjectPool.__init__(self, size=size)
+
+        connection_class = self._scheme_to_class.get(scheme, None)
+        if connection_class is None:
+            m = 'Unsupported scheme: %s' % (scheme,)
+            raise ValueError(m)
+
+        self.connection_class = connection_class
+        self.scheme = scheme
+        self.netloc = netloc
+
+    def _pool_create(self):
+        conn = self.connection_class(self.netloc)
+        conn._use_counter = USAGE_LIMIT
+        conn._pool = self
+        conn._real_close = conn.close
+        conn.close = instancemethod(put_http_connection,
+                                    conn, type(conn))
+        return conn
+
+    def _pool_cleanup(self, conn):
+        # every connection can be used a finite number of times
+        conn._use_counter -= 1
+
+        # see httplib source for connection states documentation
+        if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle':
+            try:
+                resp = conn.getresponse()
+            except ResponseNotReady:
+               return False
+
+        conn._real_close()
+        return True
+
+
+def _verify_connection(conn):
+    try:
+        conn.request("HEAD", "/")
+        conn.getresponse().read()
+    except HTTPException:
+        # The connection has died.
+        conn.close()
+        return False
+    return True
+
+
+def get_http_connection(netloc=None, scheme='http'):
+    if netloc is None:
+        m = "netloc cannot be None"
+        raise ValueError(m)
+    # does the pool need to be created?
+    if netloc not in _pools:
+        pool = HTTPConnectionPool(scheme, netloc, size=pool_size)
+        _pools[netloc] = pool
+
+    pool = _pools[netloc]
+
+    conn = None
+    n = 0
+    while conn is None:
+        conn = pool.pool_get()
+        conn._pool = pool
+        if not _verify_connection(conn):
+            # The connection has died, e.g., Keepalive expired
+            conn = None
+            n += 1
+            if n > RETRY_LIMIT:
+                m = ("Could not get live HTTP conn from pool"
+                     "after %d retries." % RETRY_LIMIT)
+                raise RuntimeError(m)
+    return conn
+
+def main():
+    #cpool = HTTPConnectionPool('https', 'pithos.okeanos.io/v1', size=8)
+    c = get_http_connection('pithos.okeanos.io', 'https')
+
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git a/kamaki/clients/connection/pool/tests.py b/kamaki/clients/connection/pool/tests.py
new file mode 100755 (executable)
index 0000000..64ba5ca
--- /dev/null
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+#
+# -*- coding: utf-8 -*-
+#
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+#
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+#
+#
+
+"""Unit Tests for the pool classes in synnefo.lib.pool
+
+Provides unit tests for the code implementing pool
+classes in the synnefo.lib.pool module.
+
+"""
+
+# Support running under a gevent-monkey-patched environment
+# if the "monkey" argument is specified in the command line.
+import sys
+if "monkey" in sys.argv:
+    from gevent import monkey
+    monkey.patch_all()
+    sys.argv.pop(sys.argv.index("monkey"))
+
+import sys
+import time
+import threading
+
+from . import ObjectPool, PoolEmptyError
+
+# Use backported unittest functionality if Python < 2.7
+try:
+    import unittest2 as unittest
+except ImportError:
+    if sys.version_info < (2, 7):
+        raise Exception("The unittest2 package is required for Python < 2.7")
+    import unittest
+
+
+class NumbersPool(ObjectPool):
+    max = 0
+
+    def _pool_create(self):
+        n = self.max
+        self.max += 1
+        return n
+
+    def _pool_cleanup(self, obj):
+        n = int(obj)
+        if n < 0:
+            return True
+        pass
+
+
+class ObjectPoolTestCase(unittest.TestCase):
+    def test_create_pool_requires_size(self):
+        """Test __init__() requires valid size argument"""
+        self.assertRaises(ValueError, ObjectPool)
+        self.assertRaises(ValueError, ObjectPool, size="size10")
+        self.assertRaises(ValueError, ObjectPool, size=0)
+        self.assertRaises(ValueError, ObjectPool, size=-1)
+
+    def test_create_pool(self):
+        """Test pool creation works"""
+        pool = ObjectPool(100)
+        self.assertEqual(pool.size, 100)
+
+    def test_get_not_implemented(self):
+        """Test pool_get() method not implemented in abstract class"""
+        pool = ObjectPool(100)
+        self.assertRaises(NotImplementedError, pool.pool_get)
+
+    def test_put_not_implemented(self):
+        """Test pool_put() method not implemented in abstract class"""
+        pool = ObjectPool(100)
+        self.assertRaises(NotImplementedError, pool.pool_put, None)
+
+
+class NumbersPoolTestCase(unittest.TestCase):
+    N = 1500
+    SEC = 0.5
+    maxDiff = None
+
+    def setUp(self):
+        self.numbers = NumbersPool(self.N)
+
+    def test_initially_empty(self):
+        """Test pool is empty upon creation"""
+        self.assertEqual(self.numbers._set, set([]))
+
+    def test_seq_allocate_all(self):
+        """Test allocation and deallocation of all pool objects"""
+        n = []
+        for _ in xrange(0, self.N):
+            n.append(self.numbers.pool_get())
+        self.assertEqual(n, range(0, self.N))
+        for i in n:
+            self.numbers.pool_put(i)
+        self.assertEqual(self.numbers._set, set(n))
+
+    def test_parallel_allocate_all(self):
+        """Allocate all pool objects in parallel"""
+        def allocate_one(pool, results, index):
+            n = pool.pool_get()
+            results[index] = n
+
+        results = [None] * self.N
+        threads = [threading.Thread(target=allocate_one,
+                                    args=(self.numbers, results, i,))
+                   for i in xrange(0, self.N)]
+
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        # This nonblocking pool_get() should fail
+        self.assertRaises(PoolEmptyError, self.numbers.pool_get,
+                          blocking=False)
+        self.assertEqual(sorted(results), range(0, self.N))
+
+    def test_allocate_no_create(self):
+        """Allocate objects from the pool without creating them"""
+        for i in xrange(0, self.N):
+            self.assertIsNone(self.numbers.pool_get(create=False))
+
+        # This nonblocking pool_get() should fail
+        self.assertRaises(PoolEmptyError, self.numbers.pool_get,
+                          blocking=False)
+
+    def test_pool_cleanup_returns_failure(self):
+        """Put a broken object, test a new one is retrieved eventually"""
+        n = []
+        for _ in xrange(0, self.N):
+            n.append(self.numbers.pool_get())
+        self.assertEqual(n, range(0, self.N))
+
+        del n[-1:]
+        self.numbers.pool_put(-1)  # This is a broken object
+        self.assertFalse(self.numbers._set)
+        self.assertEqual(self.numbers.pool_get(), self.N)
+
+    def test_parallel_get_blocks(self):
+        """Test threads block if no object left in the pool"""
+        def allocate_one_and_sleep(pool, sec, result, index):
+            n = pool.pool_get()
+            time.sleep(sec)
+            result[index] = n
+            pool.pool_put(n)
+
+        results = [None] * (2 * self.N + 1)
+        threads = [threading.Thread(target=allocate_one_and_sleep,
+                                    args=(self.numbers, self.SEC, results, i,))
+                   for i in xrange(0, 2 * self.N + 1)]
+
+        # This should take 3 * SEC seconds
+        start = time.time()
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        diff = time.time() - start
+        self.assertTrue(diff > 3 * self.SEC)
+        self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5)
+
+        # One number must have been used three times,
+        # all others must have been used once
+        freq = {}
+        for r in results:
+            freq[r] = freq.get(r, 0) + 1
+        self.assertTrue(len([r for r in results if freq[r] == 2]), self. N)
+        triples = [r for r in freq if freq[r] == 3]
+        self.assertTrue(len(triples), 1)
+        self.assertEqual(sorted(results),
+                         sorted(2 * range(0, self.N) + triples))
+
+
+if __name__ == '__main__':
+    unittest.main()
index 004df6d..16af9cc 100644 (file)
@@ -126,8 +126,9 @@ class HTTPRequest(HTTPConnection):
                                param_str+= '='+unicode(val)
                        self.url += param_str
 
+               #use pool before request, so that it will block if pool is full
+               res = self._get_response_object()
                self._response_object = requests.request(self.method, self.url, headers=self.headers, data=data,
                        verify=self.verify, prefetch = False)
-               res = self._get_response_object()
                res.request = self._response_object.request
                return res