1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
42 from logging import getLogger
44 from objpool.http import PooledHTTPConnection
47 TIMEOUT = 60.0 # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
56 if v and isinstance(v, unicode):
57 return quote(v.encode('utf-8'))
61 class ClientError(Exception):
62 def __init__(self, message, status=0, details=None):
63 log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
68 message += '' if message and message[-1] == '\n' else '\n'
69 serv_stat, sep, new_msg = message.partition('{')
70 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 json_msg = loads(new_msg)
72 key = json_msg.keys()[0]
73 serv_stat = serv_stat.strip()
75 json_msg = json_msg[key]
76 message = '%s %s (%s)\n' % (
79 json_msg['message']) if (
80 'message' in json_msg) else '%s %s' % (serv_stat, key)
81 status = json_msg.get('code', status)
82 if 'details' in json_msg:
85 if not isinstance(details, list):
87 if json_msg['details']:
88 details.append(json_msg['details'])
92 while message.endswith('\n\n'):
93 message = message[:-1]
94 super(ClientError, self).__init__(message)
95 self.status = status if isinstance(status, int) else 0
96 self.details = details if details else []
105 class RequestManager(Logged):
106 """Handle http request information"""
108 def _connection_info(self, url, path, params={}):
109 """ Set self.url to scheme://netloc/?params
110 :param url: (str or unicode) The service url
112 :param path: (str or unicode) The service path (url/path)
114 :param params: (dict) Parameters to add to final url
116 :returns: (scheme, netloc)
118 url = _encode(str(url)) if url else 'http://127.0.0.1/'
119 url += '' if url.endswith('/') else '/'
121 url += _encode(path[1:] if path.startswith('/') else path)
123 for key, val in params.items():
125 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
127 parsed = urlparse(url)
129 self.path = parsed.path or '/'
131 self.path += '?%s' % parsed.query
132 return (parsed.scheme, parsed.netloc)
135 self, method, url, path,
136 data=None, headers={}, params={}):
137 method = method.upper()
138 assert method in HTTP_METHODS, 'Invalid http method %s' % method
140 assert isinstance(headers, dict)
141 self.headers = dict(headers)
142 self.method, self.data = method, data
143 self.scheme, self.netloc = self._connection_info(url, path, params)
146 sendlog.info('%s %s://%s%s\t[%s]' % (
152 for key, val in self.headers.items():
153 if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
155 sendlog.info(' %s: %s\t[%s]' % (key, val, self))
157 sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
159 sendlog.info(self.data)
161 sendlog.info('data size:0\t[%s]' % self)
164 def perform(self, conn):
166 :param conn: (httplib connection object)
168 :returns: (HTTPResponse)
171 method=str(self.method.upper()),
173 headers=self.headers,
176 keep_trying = TIMEOUT
177 while keep_trying > 0:
179 return conn.getresponse()
180 except ResponseNotReady:
181 wait = 0.03 * random()
184 logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
185 recvlog.debug(logmsg)
186 raise ClientError('HTTPResponse takes too long - kamaki timeout')
189 class ResponseManager(Logged):
190 """Manage the http request and handle the response data, headers, etc."""
192 def __init__(self, request, poolsize=None):
194 :param request: (RequestManager)
196 self.request = request
197 self._request_performed = False
198 self.poolsize = poolsize
200 def _get_response(self):
201 if self._request_performed:
204 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
206 with PooledHTTPConnection(
207 self.request.netloc, self.request.scheme,
208 **pool_kw) as connection:
209 self.request.LOG_TOKEN = self.LOG_TOKEN
210 self.request.LOG_DATA = self.LOG_DATA
211 r = self.request.perform(connection)
212 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
213 self, r, self.request))
214 self._request_performed = True
215 self._status_code, self._status = r.status, unquote(r.reason)
217 '%d %s\t[p: %s]' % (self.status_code, self.status, self))
218 self._headers = dict()
219 for k, v in r.getheaders():
220 if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
224 recvlog.info(' %s: %s\t[p: %s]' % (k, v, self))
225 self._content = r.read()
226 recvlog.info('data size: %s\t[p: %s]' % (
227 len(self._content) if self._content else 0,
229 if self.LOG_DATA and self._content:
230 recvlog.info('%s\t[p: %s]' % (self._content, self))
231 except Exception as err:
232 from traceback import format_stack
233 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
235 'Failed while http-connecting to %s (%s)' % (
240 def status_code(self):
242 return self._status_code
262 :returns: (str) content
265 return '%s' % self._content
270 :returns: (dict) squeezed from json-formated content
274 return loads(self._content)
275 except ValueError as err:
276 raise ClientError('Response not formated in JSON - %s' % err)
279 class SilentEvent(Thread):
280 """Thread-run method(*args, **kwargs)"""
281 def __init__(self, method, *args, **kwargs):
282 super(self.__class__, self).__init__()
289 return getattr(self, '_exception', False)
293 return getattr(self, '_value', None)
297 self._value = self.method(*(self.args), **(self.kwargs))
298 except Exception as e:
299 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
302 e.status if isinstance(e, ClientError) else '',
307 class Client(object):
311 '%a %b %d %H:%M:%S %Y',
312 '%A, %d-%b-%y %H:%M:%S GMT',
313 '%a, %d %b %Y %H:%M:%S GMT']
317 def __init__(self, base_url, token):
318 assert base_url, 'No base_url for client %s' % self
319 self.base_url = base_url
321 self.headers, self.params = dict(), dict()
323 def _init_thread_limit(self, limit=1):
324 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
325 self._thread_limit = limit
326 self._elapsed_old = 0.0
327 self._elapsed_new = 0.0
329 def _watch_thread_limit(self, threadlist):
330 self._thread_limit = getattr(self, '_thread_limit', 1)
331 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
332 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
333 recvlog.debug('# running threads: %s' % len(threadlist))
334 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
335 self._thread_limit < self.MAX_THREADS):
336 self._thread_limit += 1
337 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
338 self._thread_limit -= 1
340 self._elapsed_old = self._elapsed_new
341 if len(threadlist) >= self._thread_limit:
342 self._elapsed_new = 0.0
343 for thread in threadlist:
346 self._elapsed_new += time() - begin_time
347 self._elapsed_new = self._elapsed_new / len(threadlist)
351 def _raise_for_status(self, r):
352 log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
353 status_msg = getattr(r, 'status', None) or ''
355 message = '%s %s\n' % (status_msg, r.text)
357 message = '%s %s\n' % (status_msg, r)
358 status = getattr(r, 'status_code', getattr(r, 'status', 0))
359 raise ClientError(message, status=status)
361 def set_header(self, name, value, iff=True):
362 """Set a header 'name':'value'"""
363 if value is not None and iff:
364 self.headers[name] = value
366 def set_param(self, name, value=None, iff=True):
368 self.params[name] = value
372 async_headers=dict(), async_params=dict(),
374 """Commit an HTTP request to base_url/path
375 Requests are commited to and performed by Request/ResponseManager
376 These classes perform a lazy http request. Present method, by default,
377 enforces them to perform the http call. Hint: call present method with
378 success=None to get a non-performed ResponseManager object.
380 assert isinstance(method, str) or isinstance(method, unicode)
382 assert isinstance(path, str) or isinstance(path, unicode)
384 headers = dict(self.headers)
385 headers.update(async_headers)
386 params = dict(self.params)
387 params.update(async_params)
388 success = kwargs.pop('success', 200)
389 data = kwargs.pop('data', None)
390 headers.setdefault('X-Auth-Token', self.token)
392 data = dumps(kwargs.pop('json'))
393 headers.setdefault('Content-Type', 'application/json')
395 headers.setdefault('Content-Length', '%s' % len(data))
397 sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
398 req = RequestManager(
399 method, self.base_url, path,
400 data=data, headers=headers, params=params)
402 r = ResponseManager(req)
403 r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
405 self.headers = dict()
408 if success is not None:
409 # Success can either be an int or a collection
410 success = (success,) if isinstance(success, int) else success
411 if r.status_code not in success:
412 self._raise_for_status(r)
415 def delete(self, path, **kwargs):
416 return self.request('delete', path, **kwargs)
418 def get(self, path, **kwargs):
419 return self.request('get', path, **kwargs)
421 def head(self, path, **kwargs):
422 return self.request('head', path, **kwargs)
424 def post(self, path, **kwargs):
425 return self.request('post', path, **kwargs)
427 def put(self, path, **kwargs):
428 return self.request('put', path, **kwargs)
430 def copy(self, path, **kwargs):
431 return self.request('copy', path, **kwargs)
433 def move(self, path, **kwargs):
434 return self.request('move', path, **kwargs)