1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
44 from objpool.http import PooledHTTPConnection
47 TIMEOUT = 60.0 # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
56 if v and isinstance(v, unicode):
57 return quote(v.encode('utf-8'))
61 class ClientError(Exception):
62 def __init__(self, message, status=0, details=None):
63 log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
68 message += '' if message and message[-1] == '\n' else '\n'
69 serv_stat, sep, new_msg = message.partition('{')
70 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 json_msg = loads(new_msg)
72 key = json_msg.keys()[0]
73 serv_stat = serv_stat.strip()
75 json_msg = json_msg[key]
76 message = '%s %s (%s)\n' % (
79 json_msg['message']) if (
80 'message' in json_msg) else '%s %s' % (serv_stat, key)
81 status = json_msg.get('code', status)
82 if 'details' in json_msg:
85 if not isinstance(details, list):
87 if json_msg['details']:
88 details.append(json_msg['details'])
92 while message.endswith('\n\n'):
93 message = message[:-1]
94 super(ClientError, self).__init__(message)
95 self.status = status if isinstance(status, int) else 0
96 self.details = details if details else []
107 class RequestManager(Logged):
108 """Handle http request information"""
110 def _connection_info(self, url, path, params={}):
111 """ Set self.url to scheme://netloc/?params
112 :param url: (str or unicode) The service url
114 :param path: (str or unicode) The service path (url/path)
116 :param params: (dict) Parameters to add to final url
118 :returns: (scheme, netloc)
120 url = _encode(str(url)) if url else 'http://127.0.0.1/'
121 url += '' if url.endswith('/') else '/'
123 url += _encode(path[1:] if path.startswith('/') else path)
125 for key, val in params.items():
126 val = '' if val in (None, False) else _encode(u'%s' % val)
127 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
129 parsed = urlparse(url)
131 self.path = parsed.path or '/'
133 self.path += '?%s' % parsed.query
134 return (parsed.scheme, parsed.netloc)
137 self, method, url, path,
138 data=None, headers={}, params={}):
139 method = method.upper()
140 assert method in HTTP_METHODS, 'Invalid http method %s' % method
142 assert isinstance(headers, dict)
143 self.headers = dict(headers)
144 self.method, self.data = method, data
145 self.scheme, self.netloc = self._connection_info(url, path, params)
148 plog = '\t[%s]' if self.LOG_PID else ''
149 sendlog.info('- - - - - - -')
150 sendlog.info('%s %s://%s%s%s' % (
151 self.method, self.scheme, self.netloc, self.path, plog))
152 for key, val in self.headers.items():
153 if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154 self._token, val = val, '...'
155 sendlog.info(' %s: %s%s' % (key, val, plog))
157 sendlog.info('data size:%s%s' % (len(self.data), plog))
159 sendlog.info(self.data.replace(self._token, '...') if (
160 self._token) else self.data)
162 sendlog.info('data size:0%s' % plog)
164 def perform(self, conn):
166 :param conn: (httplib connection object)
168 :returns: (HTTPResponse)
172 method=str(self.method.upper()),
174 headers=self.headers,
177 keep_trying = TIMEOUT
178 while keep_trying > 0:
180 return conn.getresponse()
181 except ResponseNotReady:
182 wait = 0.03 * random()
185 plog = '\t[%s]' if self.LOG_PID else ''
186 logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187 recvlog.debug(logmsg)
188 raise ClientError('HTTPResponse takes too long - kamaki timeout')
191 class ResponseManager(Logged):
192 """Manage the http request and handle the response data, headers, etc."""
194 def __init__(self, request, poolsize=None, connection_retry_limit=0):
196 :param request: (RequestManager)
198 :param poolsize: (int) the size of the connection pool
200 :param connection_retry_limit: (int)
202 self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203 self.request = request
204 self._request_performed = False
205 self.poolsize = poolsize
207 def _get_response(self):
208 if self._request_performed:
211 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212 for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
214 with PooledHTTPConnection(
215 self.request.netloc, self.request.scheme,
216 **pool_kw) as connection:
217 self.request.LOG_TOKEN = self.LOG_TOKEN
218 self.request.LOG_DATA = self.LOG_DATA
219 self.request.LOG_PID = self.LOG_PID
220 r = self.request.perform(connection)
223 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224 self, r, self.request))
225 plog = '\t[%s]' % self
226 self._request_performed = True
227 self._status_code, self._status = r.status, unquote(
231 self.status_code, self.status, plog))
232 self._headers = dict()
233 for k, v in r.getheaders():
234 if k.lower in ('x-auth-token', ) and (
236 self._token, v = v, '...'
239 recvlog.info(' %s: %s%s' % (k, v, plog))
240 self._content = r.read()
241 recvlog.info('data size: %s%s' % (
242 len(self._content) if self._content else 0, plog))
243 if self.LOG_DATA and self._content:
244 data = '%s%s' % (self._content, plog)
246 data = data.replace(self._token, '...')
248 sendlog.info('- - - - - - -')
250 except Exception as err:
251 if isinstance(err, HTTPException):
252 if retries >= self.CONNECTION_TRY_LIMIT:
254 'Connection to %s failed %s times (%s: %s )' % (
255 self.request.url, retries, type(err), err))
257 from traceback import format_stack
259 '\n'.join(['%s' % type(err)] + format_stack()))
261 'Failed while http-connecting to %s (%s)' % (
262 self.request.url, err))
265 def status_code(self):
267 return self._status_code
287 :returns: (str) content
290 return '%s' % self._content
295 :returns: (dict) squeezed from json-formated content
299 return loads(self._content)
300 except ValueError as err:
301 raise ClientError('Response not formated in JSON - %s' % err)
304 class SilentEvent(Thread):
305 """Thread-run method(*args, **kwargs)"""
306 def __init__(self, method, *args, **kwargs):
307 super(self.__class__, self).__init__()
314 return getattr(self, '_exception', False)
318 return getattr(self, '_value', None)
322 self._value = self.method(*(self.args), **(self.kwargs))
323 except Exception as e:
324 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
327 e.status if isinstance(e, ClientError) else '',
332 class Client(Logged):
335 DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336 CONNECTION_RETRY_LIMIT = 0
338 def __init__(self, base_url, token):
339 assert base_url, 'No base_url for client %s' % self
340 self.base_url = base_url
342 self.headers, self.params = dict(), dict()
344 def _init_thread_limit(self, limit=1):
345 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346 self._thread_limit = limit
347 self._elapsed_old = 0.0
348 self._elapsed_new = 0.0
350 def _watch_thread_limit(self, threadlist):
351 self._thread_limit = getattr(self, '_thread_limit', 1)
352 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
353 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354 recvlog.debug('# running threads: %s' % len(threadlist))
355 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356 self._thread_limit < self.MAX_THREADS):
357 self._thread_limit += 1
358 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359 self._thread_limit -= 1
361 self._elapsed_old = self._elapsed_new
362 if len(threadlist) >= self._thread_limit:
363 self._elapsed_new = 0.0
364 for thread in threadlist:
367 self._elapsed_new += time() - begin_time
368 self._elapsed_new = self._elapsed_new / len(threadlist)
372 def async_run(self, method, kwarg_list):
373 """Fire threads of operations
375 :param method: the method to run in each thread
377 :param kwarg_list: (list of dicts) the arguments to pass in each method
380 :returns: (list) the results of each method call w.r. to the order of
383 flying, results = {}, {}
384 self._init_thread_limit()
385 for index, kwargs in enumerate(kwarg_list):
386 self._watch_thread_limit(flying.values())
387 flying[index] = SilentEvent(method=method, **kwargs)
388 flying[index].start()
390 for key, thread in flying.items():
392 unfinished[key] = thread
393 elif thread.exception:
394 print 'HERE IS AN EXCEPTION MK?'
395 raise thread.exception
397 results[key] = thread.value
398 print 'NO EXCEPTION', thread.value
400 sendlog.info('- - - wait for threads to finish')
401 for key, thread in flying.items():
404 elif thread.exception:
405 print 'HERE IS AN EXCEPTION MK-2?'
406 raise thread.exception
407 results[key] = thread.value
408 print 'NO EXCEPTION-2', thread.value
409 return results.values()
411 def _raise_for_status(self, r):
412 log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
413 status_msg = getattr(r, 'status', None) or ''
415 message = '%s %s\n' % (status_msg, r.text)
417 message = '%s %s\n' % (status_msg, r)
418 status = getattr(r, 'status_code', getattr(r, 'status', 0))
419 raise ClientError(message, status=status)
421 def set_header(self, name, value, iff=True):
422 """Set a header 'name':'value'"""
423 if value is not None and iff:
424 self.headers[name] = unicode(value)
426 def set_param(self, name, value=None, iff=True):
428 self.params[name] = unicode(value)
432 async_headers=dict(), async_params=dict(),
434 """Commit an HTTP request to base_url/path
435 Requests are commited to and performed by Request/ResponseManager
436 These classes perform a lazy http request. Present method, by default,
437 enforces them to perform the http call. Hint: call present method with
438 success=None to get a non-performed ResponseManager object.
440 assert isinstance(method, str) or isinstance(method, unicode)
442 assert isinstance(path, str) or isinstance(path, unicode)
444 headers = dict(self.headers)
445 headers.update(async_headers)
446 params = dict(self.params)
447 params.update(async_params)
448 success = kwargs.pop('success', 200)
449 data = kwargs.pop('data', None)
450 headers.setdefault('X-Auth-Token', self.token)
452 data = dumps(kwargs.pop('json'))
453 headers.setdefault('Content-Type', 'application/json')
455 headers.setdefault('Content-Length', '%s' % len(data))
457 plog = '\t[%s]' if self.LOG_PID else ''
458 sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
459 req = RequestManager(
460 method, self.base_url, path,
461 data=data, headers=headers, params=params)
464 req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
465 r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
466 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
467 r._token = headers['X-Auth-Token']
469 self.headers = dict()
472 if success is not None:
473 # Success can either be an int or a collection
474 success = (success,) if isinstance(success, int) else success
475 if r.status_code not in success:
476 self._raise_for_status(r)
479 def delete(self, path, **kwargs):
480 return self.request('delete', path, **kwargs)
482 def get(self, path, **kwargs):
483 return self.request('get', path, **kwargs)
485 def head(self, path, **kwargs):
486 return self.request('head', path, **kwargs)
488 def post(self, path, **kwargs):
489 return self.request('post', path, **kwargs)
491 def put(self, path, **kwargs):
492 return self.request('put', path, **kwargs)
494 def copy(self, path, **kwargs):
495 return self.request('copy', path, **kwargs)
497 def move(self, path, **kwargs):
498 return self.request('move', path, **kwargs)