1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
42 from logging import getLogger
44 from objpool.http import PooledHTTPConnection
47 TIMEOUT = 60.0 # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
56 if v and isinstance(v, unicode):
57 return quote(v.encode('utf-8'))
61 class ClientError(Exception):
62 def __init__(self, message, status=0, details=None):
63 log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
68 message += '' if message and message[-1] == '\n' else '\n'
69 serv_stat, sep, new_msg = message.partition('{')
70 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 json_msg = loads(new_msg)
72 key = json_msg.keys()[0]
73 serv_stat = serv_stat.strip()
75 json_msg = json_msg[key]
76 message = '%s %s (%s)\n' % (
79 json_msg['message']) if (
80 'message' in json_msg) else '%s %s' % (serv_stat, key)
81 status = json_msg.get('code', status)
82 if 'details' in json_msg:
85 if not isinstance(details, list):
87 if json_msg['details']:
88 details.append(json_msg['details'])
92 while message.endswith('\n\n'):
93 message = message[:-1]
94 super(ClientError, self).__init__(message)
95 self.status = status if isinstance(status, int) else 0
96 self.details = details if details else []
105 class RequestManager(Logged):
106 """Handle http request information"""
108 def _connection_info(self, url, path, params={}):
109 """ Set self.url to scheme://netloc/?params
110 :param url: (str or unicode) The service url
112 :param path: (str or unicode) The service path (url/path)
114 :param params: (dict) Parameters to add to final url
116 :returns: (scheme, netloc)
118 url = _encode(str(url)) if url else 'http://127.0.0.1/'
119 url += '' if url.endswith('/') else '/'
121 url += _encode(path[1:] if path.startswith('/') else path)
123 for key, val in params.items():
125 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
127 parsed = urlparse(url)
129 self.path = parsed.path or '/'
131 self.path += '?%s' % parsed.query
132 return (parsed.scheme, parsed.netloc)
135 self, method, url, path,
136 data=None, headers={}, params={}):
137 method = method.upper()
138 assert method in HTTP_METHODS, 'Invalid http method %s' % method
140 assert isinstance(headers, dict)
141 self.headers = dict(headers)
142 self.method, self.data = method, data
143 self.scheme, self.netloc = self._connection_info(url, path, params)
146 sendlog.info('%s %s://%s%s\t[%s]' % (
147 self.method, self.scheme, self.netloc, self.path, self))
148 for key, val in self.headers.items():
149 if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
151 sendlog.info(' %s: %s\t[%s]' % (key, val, self))
153 sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
155 sendlog.info(self.data)
157 sendlog.info('data size:0\t[%s]' % self)
160 def perform(self, conn):
162 :param conn: (httplib connection object)
164 :returns: (HTTPResponse)
167 method=str(self.method.upper()),
169 headers=self.headers,
172 keep_trying = TIMEOUT
173 while keep_trying > 0:
175 return conn.getresponse()
176 except ResponseNotReady:
177 wait = 0.03 * random()
180 logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
181 recvlog.debug(logmsg)
182 raise ClientError('HTTPResponse takes too long - kamaki timeout')
185 class ResponseManager(Logged):
186 """Manage the http request and handle the response data, headers, etc."""
188 def __init__(self, request, poolsize=None):
190 :param request: (RequestManager)
192 self.request = request
193 self._request_performed = False
194 self.poolsize = poolsize
196 def _get_response(self):
197 if self._request_performed:
200 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
202 with PooledHTTPConnection(
203 self.request.netloc, self.request.scheme,
204 **pool_kw) as connection:
205 self.request.LOG_TOKEN = self.LOG_TOKEN
206 self.request.LOG_DATA = self.LOG_DATA
207 r = self.request.perform(connection)
208 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
209 self, r, self.request))
210 self._request_performed = True
211 self._status_code, self._status = r.status, unquote(r.reason)
213 '%d %s\t[p: %s]' % (self.status_code, self.status, self))
214 self._headers = dict()
215 for k, v in r.getheaders():
216 if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
220 recvlog.info(' %s: %s\t[p: %s]' % (k, v, self))
221 self._content = r.read()
222 recvlog.info('data size: %s\t[p: %s]' % (
223 len(self._content) if self._content else 0,
225 if self.LOG_DATA and self._content:
226 recvlog.info('%s\t[p: %s]' % (self._content, self))
227 except Exception as err:
228 from traceback import format_stack
229 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
231 'Failed while http-connecting to %s (%s)' % (
236 def status_code(self):
238 return self._status_code
258 :returns: (str) content
261 return '%s' % self._content
266 :returns: (dict) squeezed from json-formated content
270 return loads(self._content)
271 except ValueError as err:
272 raise ClientError('Response not formated in JSON - %s' % err)
275 class SilentEvent(Thread):
276 """Thread-run method(*args, **kwargs)"""
277 def __init__(self, method, *args, **kwargs):
278 super(self.__class__, self).__init__()
285 return getattr(self, '_exception', False)
289 return getattr(self, '_value', None)
293 self._value = self.method(*(self.args), **(self.kwargs))
294 except Exception as e:
295 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
298 e.status if isinstance(e, ClientError) else '',
303 class Client(object):
307 '%a %b %d %H:%M:%S %Y',
308 '%A, %d-%b-%y %H:%M:%S GMT',
309 '%a, %d %b %Y %H:%M:%S GMT']
313 def __init__(self, base_url, token):
314 assert base_url, 'No base_url for client %s' % self
315 self.base_url = base_url
317 self.headers, self.params = dict(), dict()
319 def _init_thread_limit(self, limit=1):
320 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
321 self._thread_limit = limit
322 self._elapsed_old = 0.0
323 self._elapsed_new = 0.0
325 def _watch_thread_limit(self, threadlist):
326 self._thread_limit = getattr(self, '_thread_limit', 1)
327 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
328 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
329 recvlog.debug('# running threads: %s' % len(threadlist))
330 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
331 self._thread_limit < self.MAX_THREADS):
332 self._thread_limit += 1
333 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
334 self._thread_limit -= 1
336 self._elapsed_old = self._elapsed_new
337 if len(threadlist) >= self._thread_limit:
338 self._elapsed_new = 0.0
339 for thread in threadlist:
342 self._elapsed_new += time() - begin_time
343 self._elapsed_new = self._elapsed_new / len(threadlist)
347 def _raise_for_status(self, r):
348 log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
349 status_msg = getattr(r, 'status', None) or ''
351 message = '%s %s\n' % (status_msg, r.text)
353 message = '%s %s\n' % (status_msg, r)
354 status = getattr(r, 'status_code', getattr(r, 'status', 0))
355 raise ClientError(message, status=status)
357 def set_header(self, name, value, iff=True):
358 """Set a header 'name':'value'"""
359 if value is not None and iff:
360 self.headers[name] = value
362 def set_param(self, name, value=None, iff=True):
364 self.params[name] = value
368 async_headers=dict(), async_params=dict(),
370 """Commit an HTTP request to base_url/path
371 Requests are commited to and performed by Request/ResponseManager
372 These classes perform a lazy http request. Present method, by default,
373 enforces them to perform the http call. Hint: call present method with
374 success=None to get a non-performed ResponseManager object.
376 assert isinstance(method, str) or isinstance(method, unicode)
378 assert isinstance(path, str) or isinstance(path, unicode)
380 headers = dict(self.headers)
381 headers.update(async_headers)
382 params = dict(self.params)
383 params.update(async_params)
384 success = kwargs.pop('success', 200)
385 data = kwargs.pop('data', None)
386 headers.setdefault('X-Auth-Token', self.token)
388 data = dumps(kwargs.pop('json'))
389 headers.setdefault('Content-Type', 'application/json')
391 headers.setdefault('Content-Length', '%s' % len(data))
393 sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
394 req = RequestManager(
395 method, self.base_url, path,
396 data=data, headers=headers, params=params)
398 r = ResponseManager(req)
399 r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
401 self.headers = dict()
404 if success is not None:
405 # Success can either be an int or a collection
406 success = (success,) if isinstance(success, int) else success
407 if r.status_code not in success:
408 self._raise_for_status(r)
411 def delete(self, path, **kwargs):
412 return self.request('delete', path, **kwargs)
414 def get(self, path, **kwargs):
415 return self.request('get', path, **kwargs)
417 def head(self, path, **kwargs):
418 return self.request('head', path, **kwargs)
420 def post(self, path, **kwargs):
421 return self.request('post', path, **kwargs)
423 def put(self, path, **kwargs):
424 return self.request('put', path, **kwargs)
426 def copy(self, path, **kwargs):
427 return self.request('copy', path, **kwargs)
429 def move(self, path, **kwargs):
430 return self.request('move', path, **kwargs)