-# Copyright 2011-2012 GRNET S.A. All rights reserved.
+# Copyright 2011-2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-from urllib2 import quote
+from urllib2 import quote, unquote
from urlparse import urlparse
from threading import Thread
from json import dumps, loads
from time import time
-from httplib import ResponseNotReady
+from httplib import ResponseNotReady, HTTPException
from time import sleep
from random import random
+from logging import getLogger
from objpool.http import PooledHTTPConnection
-from kamaki import logger
-LOG_FILE = logger.get_log_filename()
TIMEOUT = 60.0 # seconds
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
-log = logger.add_file_logger(__name__)
-log.debug('Logging location: %s' % logger.get_log_filename())
-
-sendlog = logger.add_file_logger('clients.send')
-recvlog = logger.add_file_logger('clients.recv')
+log = getLogger(__name__)
+sendlog = getLogger('%s.send' % __name__)
+recvlog = getLogger('%s.recv' % __name__)
def _encode(v):
LOG_TOKEN = False
LOG_DATA = False
+ LOG_PID = False
+ _token = None
class RequestManager(Logged):
:returns: (scheme, netloc)
"""
- url = _encode(url) if url else 'http://127.0.0.1/'
+ url = _encode(str(url)) if url else 'http://127.0.0.1/'
url += '' if url.endswith('/') else '/'
if path:
url += _encode(path[1:] if path.startswith('/') else path)
delim = '?'
for key, val in params.items():
- val = _encode(val)
+ val = '' if val in (None, False) else _encode(u'%s' % val)
url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
delim = '&'
parsed = urlparse(url)
self.method, self.data = method, data
self.scheme, self.netloc = self._connection_info(url, path, params)
- def log(self):
- sendlog.info('%s %s://%s%s\t[%s]' % (
- self.method,
- self.scheme,
- self.netloc,
- self.path,
- self))
+ def dump_log(self):
+ plog = '\t[%s]' if self.LOG_PID else ''
+ sendlog.info('- - - - - - -')
+ sendlog.info('%s %s://%s%s%s' % (
+ self.method, self.scheme, self.netloc, self.path, plog))
for key, val in self.headers.items():
- if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
- continue
- sendlog.info(' %s: %s\t[%s]' % (key, val, self))
+ if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
+ self._token, val = val, '...'
+ sendlog.info(' %s: %s%s' % (key, val, plog))
if self.data:
- sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
+ sendlog.info('data size:%s%s' % (len(self.data), plog))
if self.LOG_DATA:
- sendlog.info(self.data)
+ sendlog.info(self.data.replace(self._token, '...') if (
+ self._token) else self.data)
else:
- sendlog.info('data size:0\t[%s]' % self)
- sendlog.info('')
+ sendlog.info('data size:0%s' % plog)
def perform(self, conn):
"""
:returns: (HTTPResponse)
"""
+ self.dump_log()
conn.request(
method=str(self.method.upper()),
url=str(self.path),
headers=self.headers,
body=self.data)
- self.log()
+ sendlog.info('')
keep_trying = TIMEOUT
while keep_trying > 0:
try:
wait = 0.03 * random()
sleep(wait)
keep_trying -= wait
- logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
+ plog = '\t[%s]' if self.LOG_PID else ''
+ logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
recvlog.debug(logmsg)
raise ClientError('HTTPResponse takes too long - kamaki timeout')
class ResponseManager(Logged):
"""Manage the http request and handle the response data, headers, etc."""
- def __init__(self, request, poolsize=None):
+ def __init__(self, request, poolsize=None, connection_retry_limit=0):
"""
:param request: (RequestManager)
+
+ :param poolsize: (int) the size of the connection pool
+
+ :param connection_retry_limit: (int)
"""
+ self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
self.request = request
self._request_performed = False
self.poolsize = poolsize
return
pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
- try:
- with PooledHTTPConnection(
- self.request.netloc, self.request.scheme,
- **pool_kw) as connection:
- self.request.LOG_TOKEN = self.LOG_TOKEN
- self.request.LOG_DATA = self.LOG_DATA
- r = self.request.perform(connection)
- recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
- self, r, self.request))
- self._request_performed = True
- self._status_code, self._status = r.status, r.reason
- recvlog.info(
- '%d %s\t[p: %s]' % (self.status_code, self.status, self))
- self._headers = dict()
- for k, v in r.getheaders():
- if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
- continue
- self._headers[k] = v
- recvlog.info(' %s: %s\t[p: %s]' % (k, v, self))
- self._content = r.read()
- recvlog.info('data size: %s\t[p: %s]' % (
- len(self._content) if self._content else 0,
- self))
- if self.LOG_DATA and self._content:
- recvlog.info('%s\t[p: %s]' % (self._content, self))
- except Exception as err:
- from traceback import format_stack
- recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
- raise ClientError(
- 'Failed while http-connecting to %s (%s)' % (
- self.request.url,
- err))
+ for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
+ try:
+ with PooledHTTPConnection(
+ self.request.netloc, self.request.scheme,
+ **pool_kw) as connection:
+ self.request.LOG_TOKEN = self.LOG_TOKEN
+ self.request.LOG_DATA = self.LOG_DATA
+ self.request.LOG_PID = self.LOG_PID
+ r = self.request.perform(connection)
+ plog = ''
+ if self.LOG_PID:
+ recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
+ self, r, self.request))
+ plog = '\t[%s]' % self
+ self._request_performed = True
+ self._status_code, self._status = r.status, unquote(
+ r.reason)
+ recvlog.info(
+ '%d %s%s' % (
+ self.status_code, self.status, plog))
+ self._headers = dict()
+ for k, v in r.getheaders():
+ if k.lower in ('x-auth-token', ) and (
+ not self.LOG_TOKEN):
+ self._token, v = v, '...'
+ v = unquote(v)
+ self._headers[k] = v
+ recvlog.info(' %s: %s%s' % (k, v, plog))
+ self._content = r.read()
+ recvlog.info('data size: %s%s' % (
+ len(self._content) if self._content else 0, plog))
+ if self.LOG_DATA and self._content:
+ data = '%s%s' % (self._content, plog)
+ if self._token:
+ data = data.replace(self._token, '...')
+ sendlog.info(data)
+ sendlog.info('- - - - - - -')
+ break
+ except Exception as err:
+ if isinstance(err, HTTPException):
+ if retries >= self.CONNECTION_TRY_LIMIT:
+ raise ClientError(
+ 'Connection to %s failed %s times (%s: %s )' % (
+ self.request.url, retries, type(err), err))
+ else:
+ from traceback import format_stack
+ recvlog.debug(
+ '\n'.join(['%s' % type(err)] + format_stack()))
+ raise ClientError(
+ 'Failed while http-connecting to %s (%s)' % (
+ self.request.url, err))
@property
def status_code(self):
self._exception = e
-class Client(object):
+class Client(Logged):
MAX_THREADS = 7
- DATE_FORMATS = [
- '%a %b %d %H:%M:%S %Y',
- '%A, %d-%b-%y %H:%M:%S GMT',
- '%a, %d %b %Y %H:%M:%S GMT']
- LOG_TOKEN = False
- LOG_DATA = False
+ DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
+ CONNECTION_RETRY_LIMIT = 0
def __init__(self, base_url, token):
+ assert base_url, 'No base_url for client %s' % self
self.base_url = base_url
self.token = token
self.headers, self.params = dict(), dict()
return []
return threadlist
+ def async_run(self, method, kwarg_list):
+ """Fire threads of operations
+
+ :param method: the method to run in each thread
+
+ :param kwarg_list: (list of dicts) the arguments to pass in each method
+ call
+
+ :returns: (list) the results of each method call w.r. to the order of
+ kwarg_list
+ """
+ flying, results = {}, {}
+ self._init_thread_limit()
+ for index, kwargs in enumerate(kwarg_list):
+ self._watch_thread_limit(flying.values())
+ flying[index] = SilentEvent(method=method, **kwargs)
+ flying[index].start()
+ unfinished = {}
+ for key, thread in flying.items():
+ if thread.isAlive():
+ unfinished[key] = thread
+ elif thread.exception:
+ raise thread.exception
+ else:
+ results[key] = thread.value
+ flying = unfinished
+ sendlog.info('- - - wait for threads to finish')
+ for key, thread in flying.items():
+ if thread.isAlive():
+ thread.join()
+ if thread.exception:
+ raise thread.exception
+ results[key] = thread.value
+ return results.values()
+
def _raise_for_status(self, r):
log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
status_msg = getattr(r, 'status', None) or ''
def set_header(self, name, value, iff=True):
"""Set a header 'name':'value'"""
if value is not None and iff:
- self.headers[name] = value
+ self.headers[name] = unicode(value)
def set_param(self, name, value=None, iff=True):
if iff:
- self.params[name] = value
+ self.params[name] = unicode(value)
def request(
self, method, path,
if data:
headers.setdefault('Content-Length', '%s' % len(data))
- sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
+ plog = '\t[%s]' if self.LOG_PID else ''
+ sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
req = RequestManager(
method, self.base_url, path,
data=data, headers=headers, params=params)
# req.log()
- r = ResponseManager(req)
- r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
+ r = ResponseManager(
+ req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
+ r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
+ self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
+ r._token = headers['X-Auth-Token']
finally:
self.headers = dict()
self.params = dict()