Use connection pool after redesigning http connect
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Fri, 7 Sep 2012 09:51:18 +0000 (12:51 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Fri, 7 Sep 2012 09:51:18 +0000 (12:51 +0300)
HTTPRequestsXxx objects now use connection pooling
Atually, connections are not pooled, but responses are.

Implementation of connection pooling with httplib not ready yet

Pooling tested in uploads

kamaki/clients/__init__.py
kamaki/clients/connection/__init__.py
kamaki/clients/connection/request.py
kamaki/clients/pithos.py
kamaki/clients/storage.py

index 19f97fc..c836a2e 100644 (file)
@@ -34,6 +34,8 @@
 import json
 import logging
 from .connection import HTTPConnectionError
+from .connection.request import HTTPRequest
+#from .connection.kamakicon import KamakiHTTPConnection
 
 sendlog = logging.getLogger('clients.send')
 recvlog = logging.getLogger('clients.recv')
@@ -48,7 +50,7 @@ class ClientError(Exception):
 
 class Client(object):
 
-    def __init__(self, base_url, token, http_client=None):
+    def __init__(self, base_url, token, http_client=HTTPRequest()):
         self.base_url = base_url
         self.token = token
         self.headers = {}
@@ -57,7 +59,7 @@ class Client(object):
             "%a, %d %b %Y %H:%M:%S GMT"]
         self.http_client = http_client
 
-    def raise_for_status(self, r):
+    def _raise_for_status(self, r):
         message = "%d %s" % (r.status_code, r.status)
         try:
             details = r.text
@@ -84,6 +86,8 @@ class Client(object):
 
             data = kwargs.pop('data', None)
             self.set_default_header('X-Auth-Token', self.token)
+            #self.set_default_header('Accept', '*/*')
+            #self.set_default_header('Accept-Encoding', 'identity, deflate, compress, gzip')
 
             if 'json' in kwargs:
                 data = json.dumps(kwargs.pop('json'))
@@ -93,7 +97,7 @@ class Client(object):
 
             #kwargs.setdefault('verify', False)  # Disable certificate verification
             self.http_client.url = self.base_url + path
-            self.http_client.perform_request(method=method, data=data)
+            r = self.http_client.perform_request(method=method, data=data)
             #r = requests.request(method, url, headers=self.headers, data=data, **kwargs)
 
             req = self.http_client
@@ -104,7 +108,8 @@ class Client(object):
             if data:
                 sendlog.info('%s', data)
 
-            r = self.http_client.response
+            #r = self.http_client.response
+            #print('What is the case with r? %s'%unicode(r.content))
             recvlog.info('%d %s', r.status_code, r.status)
             for key, val in r.headers.items():
                 recvlog.info('%s: %s', key, val)
@@ -116,7 +121,7 @@ class Client(object):
                 # Success can either be an in or a collection
                 success = (success,) if isinstance(success, int) else success
                 if r.status_code not in success:
-                    self.raise_for_status(r)
+                    self._raise_for_status(r)
         except Exception as err:
             self.http_client.reset_headers()
             self.http_client.reset_params()
index ff041d0..b95f177 100644 (file)
 # interpreted as representing official policies, either expressed
 # or implied, of GRNET S.A.
 
+from .pool import ObjectPool
+
+POOL_SIZE=8
+
+class HTTPResponsePool(ObjectPool):
+
+    def __init__(self, netloc, size=POOL_SIZE):
+        super(HTTPResponsePool, self).__init__(size=size)
+        self.netloc = netloc
+
+    def _pool_create(self):
+        resp = HTTPResponse()
+        resp._pool = self
+        return resp
+
+    def _pool_cleanup(self, resp):
+        resp._get_response()
+        return True
+
 class HTTPResponse(object):
 
     def __init__(self, request=None, prefetched=False):
@@ -46,6 +65,11 @@ class HTTPResponse(object):
         self = self.request.response
         self.prefetched = True
 
+    def release(self):
+        """Release the connection.
+        Use this after finished using the response"""
+        raise NotImplementedError
+
     @property 
     def prefetched(self):
         return self._prefetched
@@ -182,9 +206,11 @@ class HTTPConnection(object):
                """
                raise NotImplementedError
 
+    """
     @property 
     def response(self):
         return self._response
     @response.setter
     def response(self, r):
         self._response = r
+    """
index cc15065..004df6d 100644 (file)
@@ -33,8 +33,9 @@
 
 import requests
 from copy import deepcopy
-from . import HTTPConnection, HTTPResponse, HTTPConnectionError
-#from requests.auth import AuthBase
+from . import HTTPConnection, HTTPResponse, HTTPConnectionError, HTTPResponsePool
+from .pool import ObjectPool
+from urlparse import urlparse
 
 # Add a convenience status property to the responses
 def _status(self):
@@ -43,15 +44,22 @@ requests.Response.status = property(_status)
 
 class HTTPRequestsResponse(HTTPResponse):
 
-       def __init__(self, requestsResponse, prefetched = False):
+       def _get_response(self):
+               if self.prefetched:
+                       return
+               r = self.request.response
                try:
-                       self.headers = requestsResponse.headers
-                       self.text = requestsResponse.text if hasattr(requestsResponse, 'text') else None
-                       self.json = requestsResponse.json if hasattr(requestsResponse, 'json') else None
-                       self.content = requestsResponse.content if hasattr(requestsResponse, 'content') else None
-                       self.exception = requestsResponse.exception if hasattr(requestsResponse, 'exception') else None
-                       self.status = requestsResponse.status
-                       self.status_code = requestsResponse.status_code
+                       self.headers = deepcopy(r.headers)
+                       self.text = deepcopy(r.text) \
+                       if hasattr(r, 'text') else None
+                       self.json = deepcopy(r.json) \
+                       if hasattr(r, 'json') else None
+                       self.content = deepcopy(r.content) \
+                       if hasattr(r, 'content') else None
+                       self.exception = deepcopy(r.exception) \
+                       if hasattr(r, 'exception') else None
+                       self.status = deepcopy(r.status)
+                       self.status_code = deepcopy(r.status_code)
                except requests.ConnectionError as err:
                        raise HTTPConnectionError('Connection error', status=651, details=err.message)
                except requests.HTTPError as err:
@@ -63,21 +71,38 @@ class HTTPRequestsResponse(HTTPResponse):
                except requests.RequestException as err:
                        raise HTTPConnectionError('HTTP Request error', status=700, details=err.message)
 
-       def _get_response(self):
+       def release(self):
                """requests object handles this automatically"""
-               pass
+               if hasattr(self, '_pool'):
+                       self._pool.pool_put(self)
+
+class HTTPRequestsResponsePool(HTTPResponsePool):
+
+       @classmethod
+       def key(self, full_url):
+               p = urlparse(full_url)
+               return '%s:%s:%s'%(p.scheme,p.netloc, p.port)
+
+       def _pool_create(self):
+               resp = HTTPRequestsResponse()
+               resp._pool = self
+               return resp
 
 class HTTPRequest(HTTPConnection):
 
+       _pools = {}
+
        #Avoid certificate verification by default
        verify = False
 
-       def _copy_response(self):
-               req = HTTPRequest(deepcopy(self.method), deepcopy(self.url), deepcopy(self.params),
-                       deepcopy(self.headers))
-               req._response_object = self._response_object
-               res = HTTPResponse(req)
-               return res
+       def _get_response_object(self):
+               pool_key = HTTPRequestsResponsePool.key(self.url)
+               try:
+                       respool = self._pools[pool_key]
+               except KeyError:
+                       self._pools[pool_key] = HTTPRequestsResponsePool(pool_key)
+                       respool = self._pools[pool_key]
+               return respool.pool_get()
 
        def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
                """perform a request
@@ -103,6 +128,6 @@ class HTTPRequest(HTTPConnection):
 
                self._response_object = requests.request(self.method, self.url, headers=self.headers, data=data,
                        verify=self.verify, prefetch = False)
-               res  = HTTPRequestsResponse(self._response_object)
-               self.response = res
+               res = self._get_response_object()
+               res.request = self._response_object.request
                return res
index 0f96146..1df00bf 100644 (file)
@@ -647,6 +647,7 @@ class PithosClient(StorageClient):
         r = self.container_post(update=True, content_type='application/octet-stream',
             content_length=len(data), data=data, format='json')
         assert r.json[0] == hash, 'Local hash does not match server'
+        r.release()
 
     def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
         content_disposition=None, content_type=None, sharing=None, public=None):
@@ -709,6 +710,7 @@ class PithosClient(StorageClient):
             upload_gen.next()
 
         flying = []
+        r .release()
         for hash in missing:
             offset, bytes = map[hash]
             f.seek(offset)
@@ -718,6 +720,7 @@ class PithosClient(StorageClient):
             for r in flying:
                 if r.ready():
                     if r.exception:
+                        r.release()
                         raise r.exception
                     if upload_cb:
                         upload_gen.next()
index f6a611a..0e44e0f 100644 (file)
 
 from . import Client, ClientError
 from .utils import filter_in, filter_out, prefix_keys, path4url
-from .connection.request import HTTPRequest
+from .connection.kamakicon import KamakiHTTPConnection
 
 class StorageClient(Client):
     """OpenStack Object Storage API 1.0 client"""
 
     def __init__(self, base_url, token, account=None, container=None):
-        super(StorageClient, self).__init__(base_url, token, http_client=HTTPRequest())
+        super(StorageClient, self).__init__(base_url, token)
+        #super(StorageClient, self).__init__(base_url, token, http_client=KamakiHTTPConnection())
         self.account = account
         self.container = container