import json
import logging
from .connection import HTTPConnectionError
+from .connection.request import HTTPRequest
+#from .connection.kamakicon import KamakiHTTPConnection
sendlog = logging.getLogger('clients.send')
recvlog = logging.getLogger('clients.recv')
class Client(object):
- def __init__(self, base_url, token, http_client=None):
+ def __init__(self, base_url, token, http_client=HTTPRequest()):
self.base_url = base_url
self.token = token
self.headers = {}
"%a, %d %b %Y %H:%M:%S GMT"]
self.http_client = http_client
- def raise_for_status(self, r):
+ def _raise_for_status(self, r):
message = "%d %s" % (r.status_code, r.status)
try:
details = r.text
data = kwargs.pop('data', None)
self.set_default_header('X-Auth-Token', self.token)
+ #self.set_default_header('Accept', '*/*')
+ #self.set_default_header('Accept-Encoding', 'identity, deflate, compress, gzip')
if 'json' in kwargs:
data = json.dumps(kwargs.pop('json'))
#kwargs.setdefault('verify', False) # Disable certificate verification
self.http_client.url = self.base_url + path
- self.http_client.perform_request(method=method, data=data)
+ r = self.http_client.perform_request(method=method, data=data)
#r = requests.request(method, url, headers=self.headers, data=data, **kwargs)
req = self.http_client
if data:
sendlog.info('%s', data)
- r = self.http_client.response
+ #r = self.http_client.response
+ #print('What is the case with r? %s'%unicode(r.content))
recvlog.info('%d %s', r.status_code, r.status)
for key, val in r.headers.items():
recvlog.info('%s: %s', key, val)
# Success can either be an in or a collection
success = (success,) if isinstance(success, int) else success
if r.status_code not in success:
- self.raise_for_status(r)
+ self._raise_for_status(r)
except Exception as err:
self.http_client.reset_headers()
self.http_client.reset_params()
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
+from .pool import ObjectPool
+
+POOL_SIZE=8
+
+class HTTPResponsePool(ObjectPool):
+
+ def __init__(self, netloc, size=POOL_SIZE):
+ super(HTTPResponsePool, self).__init__(size=size)
+ self.netloc = netloc
+
+ def _pool_create(self):
+ resp = HTTPResponse()
+ resp._pool = self
+ return resp
+
+ def _pool_cleanup(self, resp):
+ resp._get_response()
+ return True
+
class HTTPResponse(object):
def __init__(self, request=None, prefetched=False):
self = self.request.response
self.prefetched = True
+ def release(self):
+ """Release the connection.
+ Use this after finished using the response"""
+ raise NotImplementedError
+
@property
def prefetched(self):
return self._prefetched
"""
raise NotImplementedError
+ """
@property
def response(self):
return self._response
@response.setter
def response(self, r):
self._response = r
+ """
import requests
from copy import deepcopy
-from . import HTTPConnection, HTTPResponse, HTTPConnectionError
-#from requests.auth import AuthBase
+from . import HTTPConnection, HTTPResponse, HTTPConnectionError, HTTPResponsePool
+from .pool import ObjectPool
+from urlparse import urlparse
# Add a convenience status property to the responses
def _status(self):
class HTTPRequestsResponse(HTTPResponse):
- def __init__(self, requestsResponse, prefetched = False):
+ def _get_response(self):
+ if self.prefetched:
+ return
+ r = self.request.response
try:
- self.headers = requestsResponse.headers
- self.text = requestsResponse.text if hasattr(requestsResponse, 'text') else None
- self.json = requestsResponse.json if hasattr(requestsResponse, 'json') else None
- self.content = requestsResponse.content if hasattr(requestsResponse, 'content') else None
- self.exception = requestsResponse.exception if hasattr(requestsResponse, 'exception') else None
- self.status = requestsResponse.status
- self.status_code = requestsResponse.status_code
+ self.headers = deepcopy(r.headers)
+ self.text = deepcopy(r.text) \
+ if hasattr(r, 'text') else None
+ self.json = deepcopy(r.json) \
+ if hasattr(r, 'json') else None
+ self.content = deepcopy(r.content) \
+ if hasattr(r, 'content') else None
+ self.exception = deepcopy(r.exception) \
+ if hasattr(r, 'exception') else None
+ self.status = deepcopy(r.status)
+ self.status_code = deepcopy(r.status_code)
except requests.ConnectionError as err:
raise HTTPConnectionError('Connection error', status=651, details=err.message)
except requests.HTTPError as err:
except requests.RequestException as err:
raise HTTPConnectionError('HTTP Request error', status=700, details=err.message)
- def _get_response(self):
+ def release(self):
"""requests object handles this automatically"""
- pass
+ if hasattr(self, '_pool'):
+ self._pool.pool_put(self)
+
+class HTTPRequestsResponsePool(HTTPResponsePool):
+
+ @classmethod
+ def key(self, full_url):
+ p = urlparse(full_url)
+ return '%s:%s:%s'%(p.scheme,p.netloc, p.port)
+
+ def _pool_create(self):
+ resp = HTTPRequestsResponse()
+ resp._pool = self
+ return resp
class HTTPRequest(HTTPConnection):
+ _pools = {}
+
#Avoid certificate verification by default
verify = False
- def _copy_response(self):
- req = HTTPRequest(deepcopy(self.method), deepcopy(self.url), deepcopy(self.params),
- deepcopy(self.headers))
- req._response_object = self._response_object
- res = HTTPResponse(req)
- return res
+ def _get_response_object(self):
+ pool_key = HTTPRequestsResponsePool.key(self.url)
+ try:
+ respool = self._pools[pool_key]
+ except KeyError:
+ self._pools[pool_key] = HTTPRequestsResponsePool(pool_key)
+ respool = self._pools[pool_key]
+ return respool.pool_get()
def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
"""perform a request
self._response_object = requests.request(self.method, self.url, headers=self.headers, data=data,
verify=self.verify, prefetch = False)
- res = HTTPRequestsResponse(self._response_object)
- self.response = res
+ res = self._get_response_object()
+ res.request = self._response_object.request
return res
r = self.container_post(update=True, content_type='application/octet-stream',
content_length=len(data), data=data, format='json')
assert r.json[0] == hash, 'Local hash does not match server'
+ r.release()
def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
content_disposition=None, content_type=None, sharing=None, public=None):
upload_gen.next()
flying = []
+ r .release()
for hash in missing:
offset, bytes = map[hash]
f.seek(offset)
for r in flying:
if r.ready():
if r.exception:
+ r.release()
raise r.exception
if upload_cb:
upload_gen.next()
from . import Client, ClientError
from .utils import filter_in, filter_out, prefix_keys, path4url
-from .connection.request import HTTPRequest
+from .connection.kamakicon import KamakiHTTPConnection
class StorageClient(Client):
"""OpenStack Object Storage API 1.0 client"""
def __init__(self, base_url, token, account=None, container=None):
- super(StorageClient, self).__init__(base_url, token, http_client=HTTPRequest())
+ super(StorageClient, self).__init__(base_url, token)
+ #super(StorageClient, self).__init__(base_url, token, http_client=KamakiHTTPConnection())
self.account = account
self.container = container