1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
43 from objpool.http import PooledHTTPConnection
45 from kamaki.clients.utils import logger
47 DEBUG_LOG = logger.get_log_filename()
48 TIMEOUT = 60.0 # seconds
49 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
53 logger.add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
54 sendlog = logger.get_logger('clients.send')
55 sendlog.debug('Logging location: %s' % DEBUG_LOG)
57 logger.add_file_logger('data.send', __name__, filename=DEBUG_LOG)
58 datasendlog = logger.get_logger('data.send')
60 logger.add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
61 recvlog = logger.get_logger('clients.recv')
63 logger.add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
64 datarecvlog = logger.get_logger('data.recv')
66 logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
67 clienterrorlog = logger.get_logger('ClientError')
71 if v and isinstance(v, unicode):
72 return quote(v.encode('utf-8'))
76 class ClientError(Exception):
77 def __init__(self, message, status=0, details=None):
78 clienterrorlog.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
83 message += '' if message and message[-1] == '\n' else '\n'
84 serv_stat, sep, new_msg = message.partition('{')
85 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
86 json_msg = loads(new_msg)
87 key = json_msg.keys()[0]
88 serv_stat = serv_stat.strip()
90 json_msg = json_msg[key]
91 message = '%s %s (%s)\n' % (
94 json_msg['message']) if (
95 'message' in json_msg) else '%s %s' % (serv_stat, key)
96 status = json_msg.get('code', status)
97 if 'details' in json_msg:
100 if not isinstance(details, list):
102 if json_msg['details']:
103 details.append(json_msg['details'])
107 while message.endswith('\n\n'):
108 message = message[:-1]
109 super(ClientError, self).__init__(message)
110 self.status = status if isinstance(status, int) else 0
111 self.details = details if details else []
114 class RequestManager(object):
115 """Handle http request information"""
117 def _connection_info(self, url, path, params={}):
118 """ Set self.url to scheme://netloc/?params
119 :param url: (str or unicode) The service url
121 :param path: (str or unicode) The service path (url/path)
123 :param params: (dict) Parameters to add to final url
125 :returns: (scheme, netloc)
127 url = _encode(url) if url else 'http://127.0.0.1/'
128 url += '' if url.endswith('/') else '/'
130 url += _encode(path[1:] if path.startswith('/') else path)
132 for key, val in params.items():
134 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
136 parsed = urlparse(url)
138 self.path = parsed.path or '/'
140 self.path += '?%s' % parsed.query
141 return (parsed.scheme, parsed.netloc)
144 self, method, url, path,
145 data=None, headers={}, params={}):
146 method = method.upper()
147 assert method in HTTP_METHODS, 'Invalid http method %s' % method
149 assert isinstance(headers, dict)
150 self.headers = dict(headers)
151 self.method, self.data = method, data
152 self.scheme, self.netloc = self._connection_info(url, path, params)
155 sendlog.debug('%s %s://%s%s\t[%s]' % (
161 for key, val in self.headers.items():
162 if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
164 sendlog.debug('%s: %s\t[%s]', (key, val, self))
166 sendlog.debug('data size:%s\t[%s]' % (len(self.data), self))
168 datasendlog.info(self.data)
170 sendlog.debug('data size:0\t[%s]' % self)
173 def perform(self, conn):
175 :param conn: (httplib connection object)
177 :returns: (HTTPResponse)
180 method=str(self.method.upper()),
182 headers=self.headers,
186 while keep_trying > 0:
188 return conn.getresponse()
189 except ResponseNotReady:
190 wait = 0.03 * random()
193 logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
194 recvlog.debug(logmsg)
195 raise ClientError('HTTPResponse takes too long - kamaki timeout')
198 class ResponseManager(object):
199 """Manage the http request and handle the response data, headers, etc."""
201 def __init__(self, request, poolsize=None):
203 :param request: (RequestManager)
205 self.request = request
206 self._request_performed = False
207 self.poolsize = poolsize
209 def _get_response(self):
210 if self._request_performed:
213 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
215 with PooledHTTPConnection(
216 self.request.netloc, self.request.scheme,
217 **pool_kw) as connection:
218 r = self.request.perform(connection)
219 recvlog.debug('[resp: %s] <-- [req: %s]\n' % (r, self.request))
220 self._request_performed = True
221 self._status_code, self._status = r.status, r.reason
223 '%d %s\t[p: %s]' % (self.status_code, self.status, self))
224 self._headers = dict()
225 for k, v in r.getheaders():
226 if (not LOG_TOKEN) and k.lower() == 'x-auth-token':
229 recvlog.debug(' %s: %s\t[p: %s]' % (k, v, self))
230 self._content = r.read()
231 recvlog.debug('data size: %s\t[p: %s]' % (
232 len(self._content) if self._content else 0,
234 if LOG_DATA and self._content:
235 datarecvlog.debug('%s\t[p: %s]' % (self._content, self))
236 except Exception as err:
237 from traceback import format_stack
238 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
240 'Failed while http-connecting to %s (%s)' % (
245 def status_code(self):
247 return self._status_code
267 :returns: (str) content
270 return '%s' % self._content
275 :returns: (dict) squeezed from json-formated content
279 return loads(self._content)
280 except ValueError as err:
281 raise ClientError('Response not formated in JSON - %s' % err)
284 class SilentEvent(Thread):
285 """Thread-run method(*args, **kwargs)"""
286 def __init__(self, method, *args, **kwargs):
287 super(self.__class__, self).__init__()
294 return getattr(self, '_exception', False)
298 return getattr(self, '_value', None)
302 self._value = self.method(*(self.args), **(self.kwargs))
303 except Exception as e:
304 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
307 e.status if isinstance(e, ClientError) else '',
312 class Client(object):
316 '%a %b %d %H:%M:%S %Y',
317 '%A, %d-%b-%y %H:%M:%S GMT',
318 '%a, %d %b %Y %H:%M:%S GMT']
320 def __init__(self, base_url, token):
321 self.base_url = base_url
323 self.headers, self.params = dict(), dict()
325 def _init_thread_limit(self, limit=1):
326 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
327 self._thread_limit = limit
328 self._elapsed_old = 0.0
329 self._elapsed_new = 0.0
331 def _watch_thread_limit(self, threadlist):
332 self._thread_limit = getattr(self, '_thread_limit', 1)
333 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
334 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
335 recvlog.debug('# running threads: %s' % len(threadlist))
336 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
337 self._thread_limit < self.MAX_THREADS):
338 self._thread_limit += 1
339 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
340 self._thread_limit -= 1
342 self._elapsed_old = self._elapsed_new
343 if len(threadlist) >= self._thread_limit:
344 self._elapsed_new = 0.0
345 for thread in threadlist:
348 self._elapsed_new += time() - begin_time
349 self._elapsed_new = self._elapsed_new / len(threadlist)
353 def _raise_for_status(self, r):
354 clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
355 status_msg = getattr(r, 'status', None) or ''
357 message = '%s %s\n' % (status_msg, r.text)
359 message = '%s %s\n' % (status_msg, r)
360 status = getattr(r, 'status_code', getattr(r, 'status', 0))
361 raise ClientError(message, status=status)
363 def set_header(self, name, value, iff=True):
364 """Set a header 'name':'value'"""
365 if value is not None and iff:
366 self.headers[name] = value
368 def set_param(self, name, value=None, iff=True):
370 self.params[name] = value
374 async_headers=dict(), async_params=dict(),
376 """Commit an HTTP request to base_url/path
377 Requests are commited to and performed by Request/ResponseManager
378 These classes perform a lazy http request. Present method, by default,
379 enforces them to perform the http call. Hint: call present method with
380 success=None to get a non-performed ResponseManager object.
382 assert isinstance(method, str) or isinstance(method, unicode)
384 assert isinstance(path, str) or isinstance(path, unicode)
386 headers = dict(self.headers)
387 headers.update(async_headers)
388 params = dict(self.params)
389 params.update(async_params)
390 success = kwargs.pop('success', 200)
391 data = kwargs.pop('data', None)
392 headers.setdefault('X-Auth-Token', self.token)
394 data = dumps(kwargs.pop('json'))
395 headers.setdefault('Content-Type', 'application/json')
397 headers.setdefault('Content-Length', '%s' % len(data))
399 sendlog.debug('COMMIT %s @ %s\t[%s]', method, self.base_url, self)
400 req = RequestManager(
401 method, self.base_url, path,
402 data=data, headers=headers, params=params)
404 r = ResponseManager(req)
406 self.headers = dict()
409 if success is not None:
410 # Success can either be an int or a collection
411 success = (success,) if isinstance(success, int) else success
412 if r.status_code not in success:
413 self._raise_for_status(r)
416 def delete(self, path, **kwargs):
417 return self.request('delete', path, **kwargs)
419 def get(self, path, **kwargs):
420 return self.request('get', path, **kwargs)
422 def head(self, path, **kwargs):
423 return self.request('head', path, **kwargs)
425 def post(self, path, **kwargs):
426 return self.request('post', path, **kwargs)
428 def put(self, path, **kwargs):
429 return self.request('put', path, **kwargs)
431 def copy(self, path, **kwargs):
432 return self.request('copy', path, **kwargs)
434 def move(self, path, **kwargs):
435 return self.request('move', path, **kwargs)