1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
44 from objpool.http import PooledHTTPConnection
47 TIMEOUT = 60.0 # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
56 if v and isinstance(v, unicode):
57 return quote(v.encode('utf-8'))
61 class ClientError(Exception):
62 def __init__(self, message, status=0, details=None):
63 log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
68 message += '' if message and message[-1] == '\n' else '\n'
69 serv_stat, sep, new_msg = message.partition('{')
70 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 json_msg = loads(new_msg)
72 key = json_msg.keys()[0]
73 serv_stat = serv_stat.strip()
75 json_msg = json_msg[key]
76 message = '%s %s (%s)\n' % (
79 json_msg['message']) if (
80 'message' in json_msg) else '%s %s' % (serv_stat, key)
81 status = json_msg.get('code', status)
82 if 'details' in json_msg:
85 if not isinstance(details, list):
87 if json_msg['details']:
88 details.append(json_msg['details'])
92 while message.endswith('\n\n'):
93 message = message[:-1]
94 super(ClientError, self).__init__(message)
95 self.status = status if isinstance(status, int) else 0
96 self.details = details if details else []
107 class RequestManager(Logged):
108 """Handle http request information"""
110 def _connection_info(self, url, path, params={}):
111 """ Set self.url to scheme://netloc/?params
112 :param url: (str or unicode) The service url
114 :param path: (str or unicode) The service path (url/path)
116 :param params: (dict) Parameters to add to final url
118 :returns: (scheme, netloc)
120 url = url or 'http://127.0.0.1/'
121 url += '' if url.endswith('/') else '/'
123 url += _encode(path[1:] if path.startswith('/') else path)
125 for key, val in params.items():
126 val = quote('' if val in (None, False) else '%s' % _encode(val))
127 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
129 parsed = urlparse(url)
130 self.url = '%s' % url
131 self.path = (('%s' % parsed.path) if parsed.path else '/') + (
132 '?%s' % parsed.query if parsed.query else '')
133 return (parsed.scheme, parsed.netloc)
136 self, method, url, path,
137 data=None, headers={}, params={}):
138 method = method.upper()
139 assert method in HTTP_METHODS, 'Invalid http method %s' % method
141 assert isinstance(headers, dict)
142 self.headers = dict(headers)
143 self.method, self.data = method, data
144 self.scheme, self.netloc = self._connection_info(url, path, params)
147 plog = ('\t[%s]' % self) if self.LOG_PID else ''
148 sendlog.info('- - - - - - -')
149 sendlog.info('%s %s://%s%s%s' % (
150 self.method, self.scheme, self.netloc, self.path, plog))
151 for key, val in self.headers.items():
152 if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
153 self._token, val = val, '...'
154 sendlog.info(' %s: %s%s' % (key, val, plog))
156 sendlog.info('data size: %s%s' % (len(self.data), plog))
158 sendlog.info(self.data.replace(self._token, '...') if (
159 self._token) else self.data)
161 sendlog.info('data size: 0%s' % plog)
163 def _encode_headers(self):
164 headers = dict(self.headers)
165 for k, v in self.headers.items():
167 v.encode('utf-8') if isinstance(v, unicode) else v)
168 self.headers = headers
170 def perform(self, conn):
172 :param conn: (httplib connection object)
174 :returns: (HTTPResponse)
176 self._encode_headers()
179 method=self.method.upper(),
180 url=self.path.encode('utf-8'),
181 headers=self.headers,
184 keep_trying = TIMEOUT
185 while keep_trying > 0:
187 return conn.getresponse()
188 except ResponseNotReady:
189 wait = 0.03 * random()
192 plog = ('\t[%s]' % self) if self.LOG_PID else ''
193 logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194 recvlog.debug(logmsg)
195 raise ClientError('HTTPResponse takes too long - kamaki timeout')
198 class ResponseManager(Logged):
199 """Manage the http request and handle the response data, headers, etc."""
201 def __init__(self, request, poolsize=None, connection_retry_limit=0):
203 :param request: (RequestManager)
205 :param poolsize: (int) the size of the connection pool
207 :param connection_retry_limit: (int)
209 self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210 self.request = request
211 self._request_performed = False
212 self.poolsize = poolsize
213 self._headers_to_decode, self._header_prefices = [], []
215 def _get_headers_to_decode(self, headers):
216 keys = set([k.lower() for k, v in headers])
217 encodable = list(keys.intersection(self.headers_to_decode))
220 for k in self.header_prefices:
224 return encodable + filter(has_prefix, keys.difference(encodable))
226 def _get_response(self):
227 if self._request_performed:
230 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
231 for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
233 with PooledHTTPConnection(
234 self.request.netloc, self.request.scheme,
235 **pool_kw) as connection:
236 self.request.LOG_TOKEN = self.LOG_TOKEN
237 self.request.LOG_DATA = self.LOG_DATA
238 self.request.LOG_PID = self.LOG_PID
239 r = self.request.perform(connection)
242 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
243 self, r, self.request))
244 plog = '\t[%s]' % self
245 self._request_performed = True
246 self._status_code, self._status = r.status, unquote(
250 self.status_code, self.status, plog))
251 self._headers = dict()
253 r_headers = r.getheaders()
254 enc_headers = self._get_headers_to_decode(r_headers)
255 for k, v in r_headers:
256 self._headers[k] = unquote(v).decode('utf-8') if (
257 k.lower()) in enc_headers else v
258 recvlog.info(' %s: %s%s' % (k, v, plog))
259 self._content = r.read()
260 recvlog.info('data size: %s%s' % (
261 len(self._content) if self._content else 0, plog))
262 if self.LOG_DATA and self._content:
263 data = '%s%s' % (self._content, plog)
265 data = data.replace(self._token, '...')
267 recvlog.info('- - - - - - -')
269 except Exception as err:
270 if isinstance(err, HTTPException):
271 if retries >= self.CONNECTION_TRY_LIMIT:
273 'Connection to %s failed %s times (%s: %s )' % (
274 self.request.url, retries, type(err), err))
276 from traceback import format_stack
278 '\n'.join(['%s' % type(err)] + format_stack()))
282 def status_code(self):
284 return self._status_code
304 :returns: (str) content
307 return '%s' % self._content
310 def headers_to_decode(self):
311 return self._headers_to_decode
313 @headers_to_decode.setter
314 def headers_to_decode(self, header_keys):
315 self._headers_to_decode += [k.lower() for k in header_keys]
316 self._headers_to_decode = list(set(self._headers_to_decode))
319 def header_prefices(self):
320 return self._header_prefices
322 @header_prefices.setter
323 def header_prefices(self, header_key_prefices):
324 self._header_prefices += [p.lower() for p in header_key_prefices]
325 self._header_prefices = list(set(self._header_prefices))
330 :returns: (dict) squeezed from json-formated content
334 return loads(self._content)
335 except ValueError as err:
336 raise ClientError('Response not formated in JSON - %s' % err)
339 class SilentEvent(Thread):
340 """Thread-run method(*args, **kwargs)"""
341 def __init__(self, method, *args, **kwargs):
342 super(self.__class__, self).__init__()
343 self.method, self.args, self.kwargs = method, args, kwargs
347 return getattr(self, '_exception', False)
351 return getattr(self, '_value', None)
355 self._value = self.method(*(self.args), **(self.kwargs))
356 except Exception as e:
357 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
360 e.status if isinstance(e, ClientError) else '',
365 class Client(Logged):
368 DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
369 CONNECTION_RETRY_LIMIT = 0
371 def __init__(self, base_url, token):
372 assert base_url, 'No base_url for client %s' % self
373 self.base_url = base_url
375 self.headers, self.params = dict(), dict()
377 self.response_headers = []
378 self.response_header_prefices = []
380 def _init_thread_limit(self, limit=1):
381 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
382 self._thread_limit = limit
383 self._elapsed_old = 0.0
384 self._elapsed_new = 0.0
386 def _watch_thread_limit(self, threadlist):
387 self._thread_limit = getattr(self, '_thread_limit', 1)
388 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
389 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
390 recvlog.debug('# running threads: %s' % len(threadlist))
391 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
392 self._thread_limit < self.MAX_THREADS):
393 self._thread_limit += 1
394 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
395 self._thread_limit -= 1
397 self._elapsed_old = self._elapsed_new
398 if len(threadlist) >= self._thread_limit:
399 self._elapsed_new = 0.0
400 for thread in threadlist:
403 self._elapsed_new += time() - begin_time
404 self._elapsed_new = self._elapsed_new / len(threadlist)
408 def async_run(self, method, kwarg_list):
409 """Fire threads of operations
411 :param method: the method to run in each thread
413 :param kwarg_list: (list of dicts) the arguments to pass in each method
416 :returns: (list) the results of each method call w.r. to the order of
419 flying, results = {}, {}
420 self._init_thread_limit()
421 for index, kwargs in enumerate(kwarg_list):
422 self._watch_thread_limit(flying.values())
423 flying[index] = SilentEvent(method=method, **kwargs)
424 flying[index].start()
426 for key, thread in flying.items():
428 unfinished[key] = thread
429 elif thread.exception:
430 raise thread.exception
432 results[key] = thread.value
434 sendlog.info('- - - wait for threads to finish')
435 for key, thread in flying.items():
439 raise thread.exception
440 results[key] = thread.value
441 return results.values()
443 def _raise_for_status(self, r):
444 log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
445 status_msg = getattr(r, 'status', None) or ''
447 message = '%s %s\n' % (status_msg, r.text)
449 message = '%s %s\n' % (status_msg, r)
450 status = getattr(r, 'status_code', getattr(r, 'status', 0))
451 raise ClientError(message, status=status)
453 def set_header(self, name, value, iff=True):
454 """Set a header 'name':'value'"""
455 if value is not None and iff:
456 self.headers['%s' % name] = '%s' % value
458 def set_param(self, name, value=None, iff=True):
460 self.params[name] = '%s' % value
464 async_headers=dict(), async_params=dict(),
466 """Commit an HTTP request to base_url/path
467 Requests are commited to and performed by Request/ResponseManager
468 These classes perform a lazy http request. Present method, by default,
469 enforces them to perform the http call. Hint: call present method with
470 success=None to get a non-performed ResponseManager object.
472 assert isinstance(method, str) or isinstance(method, unicode)
474 assert isinstance(path, str) or isinstance(path, unicode)
476 headers = dict(self.headers)
477 headers.update(async_headers)
478 params = dict(self.params)
479 params.update(async_params)
480 success = kwargs.pop('success', 200)
481 data = kwargs.pop('data', None)
482 headers.setdefault('X-Auth-Token', self.token)
484 data = dumps(kwargs.pop('json'))
485 headers.setdefault('Content-Type', 'application/json')
487 headers.setdefault('Content-Length', '%s' % len(data))
488 plog = ('\t[%s]' % self) if self.LOG_PID else ''
489 sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
490 req = RequestManager(
491 method, self.base_url, path,
492 data=data, headers=headers, params=params)
496 poolsize=self.poolsize,
497 connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
498 r.headers_to_decode = self.response_headers
499 r.header_prefices = self.response_header_prefices
500 r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
501 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
502 r._token = headers['X-Auth-Token']
504 self.headers = dict()
507 if success is not None:
508 # Success can either be an int or a collection
509 success = (success,) if isinstance(success, int) else success
510 if r.status_code not in success:
511 self._raise_for_status(r)
514 def delete(self, path, **kwargs):
515 return self.request('delete', path, **kwargs)
517 def get(self, path, **kwargs):
518 return self.request('get', path, **kwargs)
520 def head(self, path, **kwargs):
521 return self.request('head', path, **kwargs)
523 def post(self, path, **kwargs):
524 return self.request('post', path, **kwargs)
526 def put(self, path, **kwargs):
527 return self.request('put', path, **kwargs)
529 def copy(self, path, **kwargs):
530 return self.request('copy', path, **kwargs)
532 def move(self, path, **kwargs):
533 return self.request('move', path, **kwargs)
536 class Waiter(object):
539 self, item_id, wait_status, get_status,
540 delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
541 """Wait while the item is still in wait_status or to reach it
543 :param server_id: integer (str or int)
545 :param wait_status: (str)
547 :param get_status: (method(self, item_id)) if called, returns
548 (status, progress %) If no way to tell progress, return None
550 :param delay: time interval between retries
552 :param wait_cb: (method(total steps)) returns a generator for
553 reporting progress or timeouts i.e., for a progress bar
555 :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
557 :returns: (str) the new mode if successful, (bool) False if timed out
559 status, progress = get_status(self, item_id)
562 wait_gen = wait_cb(max_wait // delay)
565 if wait_for_status ^ (status != wait_status):
572 old_wait = total_wait = 0
574 while (wait_for_status ^ (status == wait_status)) and (
575 total_wait <= max_wait):
578 for i in range(total_wait - old_wait):
582 old_wait = total_wait
583 total_wait = progress or total_wait + 1
585 status, progress = get_status(self, item_id)
587 if total_wait < max_wait:
590 for i in range(max_wait):
594 return status if (wait_for_status ^ (status != wait_status)) else False
597 self, item_id, target_status, get_status,
598 delay=1, max_wait=100, wait_cb=None):
600 item_id, target_status, get_status, delay, max_wait, wait_cb,
601 wait_for_status=True)
604 self, item_id, target_status, get_status,
605 delay=1, max_wait=100, wait_cb=None):
607 item_id, target_status, get_status, delay, max_wait, wait_cb,
608 wait_for_status=False)