1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
44 from objpool.http import PooledHTTPConnection
47 TIMEOUT = 60.0 # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
56 if v and isinstance(v, unicode):
57 return quote(v.encode('utf-8'))
61 class ClientError(Exception):
62 def __init__(self, message, status=0, details=None):
63 log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
68 message += '' if message and message[-1] == '\n' else '\n'
69 serv_stat, sep, new_msg = message.partition('{')
70 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 json_msg = loads(new_msg)
72 key = json_msg.keys()[0]
73 serv_stat = serv_stat.strip()
75 json_msg = json_msg[key]
76 message = '%s %s (%s)\n' % (
79 json_msg['message']) if (
80 'message' in json_msg) else '%s %s' % (serv_stat, key)
81 status = json_msg.get('code', status)
82 if 'details' in json_msg:
85 if not isinstance(details, list):
87 if json_msg['details']:
88 details.append(json_msg['details'])
92 while message.endswith('\n\n'):
93 message = message[:-1]
94 super(ClientError, self).__init__(message)
95 self.status = status if isinstance(status, int) else 0
96 self.details = details if details else []
107 class RequestManager(Logged):
108 """Handle http request information"""
110 def _connection_info(self, url, path, params={}):
111 """ Set self.url to scheme://netloc/?params
112 :param url: (str or unicode) The service url
114 :param path: (str or unicode) The service path (url/path)
116 :param params: (dict) Parameters to add to final url
118 :returns: (scheme, netloc)
120 url = _encode(str(url)) if url else 'http://127.0.0.1/'
121 url += '' if url.endswith('/') else '/'
123 url += _encode(path[1:] if path.startswith('/') else path)
125 for key, val in params.items():
126 val = '' if val in (None, False) else _encode('%s' % val)
127 url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
129 parsed = urlparse(url)
131 self.path = parsed.path or '/'
133 self.path += '?%s' % parsed.query
134 return (parsed.scheme, parsed.netloc)
137 self, method, url, path,
138 data=None, headers={}, params={}):
139 method = method.upper()
140 assert method in HTTP_METHODS, 'Invalid http method %s' % method
142 assert isinstance(headers, dict)
143 self.headers = dict(headers)
144 self.method, self.data = method, data
145 self.scheme, self.netloc = self._connection_info(url, path, params)
148 plog = ('\t[%s]' % self) if self.LOG_PID else ''
149 sendlog.info('- - - - - - -')
150 sendlog.info('%s %s://%s%s%s' % (
151 self.method, self.scheme, self.netloc, self.path, plog))
152 for key, val in self.headers.items():
153 if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154 self._token, val = val, '...'
155 sendlog.info(' %s: %s%s' % (key, val, plog))
157 sendlog.info('data size:%s%s' % (len(self.data), plog))
159 sendlog.info(self.data.replace(self._token, '...') if (
160 self._token) else self.data)
162 sendlog.info('data size:0%s' % plog)
164 def perform(self, conn):
166 :param conn: (httplib connection object)
168 :returns: (HTTPResponse)
172 method=str(self.method.upper()),
174 headers=self.headers,
177 keep_trying = TIMEOUT
178 while keep_trying > 0:
180 return conn.getresponse()
181 except ResponseNotReady:
182 wait = 0.03 * random()
185 plog = ('\t[%s]' % self) if self.LOG_PID else ''
186 logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187 recvlog.debug(logmsg)
188 raise ClientError('HTTPResponse takes too long - kamaki timeout')
191 class ResponseManager(Logged):
192 """Manage the http request and handle the response data, headers, etc."""
194 def __init__(self, request, poolsize=None, connection_retry_limit=0):
196 :param request: (RequestManager)
198 :param poolsize: (int) the size of the connection pool
200 :param connection_retry_limit: (int)
202 self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203 self.request = request
204 self._request_performed = False
205 self.poolsize = poolsize
207 def _get_response(self):
208 if self._request_performed:
211 pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212 for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
214 with PooledHTTPConnection(
215 self.request.netloc, self.request.scheme,
216 **pool_kw) as connection:
217 self.request.LOG_TOKEN = self.LOG_TOKEN
218 self.request.LOG_DATA = self.LOG_DATA
219 self.request.LOG_PID = self.LOG_PID
220 r = self.request.perform(connection)
223 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224 self, r, self.request))
225 plog = '\t[%s]' % self
226 self._request_performed = True
227 self._status_code, self._status = r.status, unquote(
231 self.status_code, self.status, plog))
232 self._headers = dict()
233 for k, v in r.getheaders():
234 if k.lower in ('x-auth-token', ) and (
236 self._token, v = v, '...'
239 recvlog.info(' %s: %s%s' % (k, v, plog))
240 self._content = r.read()
241 recvlog.info('data size: %s%s' % (
242 len(self._content) if self._content else 0, plog))
243 if self.LOG_DATA and self._content:
244 data = '%s%s' % (self._content, plog)
246 data = data.replace(self._token, '...')
248 recvlog.info('- - - - - - -')
250 except Exception as err:
251 if isinstance(err, HTTPException):
252 if retries >= self.CONNECTION_TRY_LIMIT:
254 'Connection to %s failed %s times (%s: %s )' % (
255 self.request.url, retries, type(err), err))
257 from traceback import format_stack
259 '\n'.join(['%s' % type(err)] + format_stack()))
261 'Failed while http-connecting to %s (%s)' % (
262 self.request.url, err))
265 def status_code(self):
267 return self._status_code
287 :returns: (str) content
290 return '%s' % self._content
295 :returns: (dict) squeezed from json-formated content
299 return loads(self._content)
300 except ValueError as err:
301 raise ClientError('Response not formated in JSON - %s' % err)
304 class SilentEvent(Thread):
305 """Thread-run method(*args, **kwargs)"""
306 def __init__(self, method, *args, **kwargs):
307 super(self.__class__, self).__init__()
314 return getattr(self, '_exception', False)
318 return getattr(self, '_value', None)
322 self._value = self.method(*(self.args), **(self.kwargs))
323 except Exception as e:
324 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
327 e.status if isinstance(e, ClientError) else '',
332 class Client(Logged):
335 DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336 CONNECTION_RETRY_LIMIT = 0
338 def __init__(self, base_url, token):
339 assert base_url, 'No base_url for client %s' % self
340 self.base_url = base_url
342 self.headers, self.params = dict(), dict()
345 def _init_thread_limit(self, limit=1):
346 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
347 self._thread_limit = limit
348 self._elapsed_old = 0.0
349 self._elapsed_new = 0.0
351 def _watch_thread_limit(self, threadlist):
352 self._thread_limit = getattr(self, '_thread_limit', 1)
353 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
354 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
355 recvlog.debug('# running threads: %s' % len(threadlist))
356 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
357 self._thread_limit < self.MAX_THREADS):
358 self._thread_limit += 1
359 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
360 self._thread_limit -= 1
362 self._elapsed_old = self._elapsed_new
363 if len(threadlist) >= self._thread_limit:
364 self._elapsed_new = 0.0
365 for thread in threadlist:
368 self._elapsed_new += time() - begin_time
369 self._elapsed_new = self._elapsed_new / len(threadlist)
373 def async_run(self, method, kwarg_list):
374 """Fire threads of operations
376 :param method: the method to run in each thread
378 :param kwarg_list: (list of dicts) the arguments to pass in each method
381 :returns: (list) the results of each method call w.r. to the order of
384 flying, results = {}, {}
385 self._init_thread_limit()
386 for index, kwargs in enumerate(kwarg_list):
387 self._watch_thread_limit(flying.values())
388 flying[index] = SilentEvent(method=method, **kwargs)
389 flying[index].start()
391 for key, thread in flying.items():
393 unfinished[key] = thread
394 elif thread.exception:
395 raise thread.exception
397 results[key] = thread.value
399 sendlog.info('- - - wait for threads to finish')
400 for key, thread in flying.items():
404 raise thread.exception
405 results[key] = thread.value
406 return results.values()
408 def _raise_for_status(self, r):
409 log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
410 status_msg = getattr(r, 'status', None) or ''
412 message = '%s %s\n' % (status_msg, r.text)
414 message = '%s %s\n' % (status_msg, r)
415 status = getattr(r, 'status_code', getattr(r, 'status', 0))
416 raise ClientError(message, status=status)
418 def set_header(self, name, value, iff=True):
419 """Set a header 'name':'value'"""
420 if value is not None and iff:
421 self.headers[name] = unicode(value)
423 def set_param(self, name, value=None, iff=True):
425 self.params[name] = '%s' % value # unicode(value)
429 async_headers=dict(), async_params=dict(),
431 """Commit an HTTP request to base_url/path
432 Requests are commited to and performed by Request/ResponseManager
433 These classes perform a lazy http request. Present method, by default,
434 enforces them to perform the http call. Hint: call present method with
435 success=None to get a non-performed ResponseManager object.
437 assert isinstance(method, str) or isinstance(method, unicode)
439 assert isinstance(path, str) or isinstance(path, unicode)
441 headers = dict(self.headers)
442 headers.update(async_headers)
443 params = dict(self.params)
444 params.update(async_params)
445 success = kwargs.pop('success', 200)
446 data = kwargs.pop('data', None)
447 headers.setdefault('X-Auth-Token', self.token)
449 data = dumps(kwargs.pop('json'))
450 headers.setdefault('Content-Type', 'application/json')
452 headers.setdefault('Content-Length', '%s' % len(data))
454 plog = ('\t[%s]' % self) if self.LOG_PID else ''
455 sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
456 req = RequestManager(
457 method, self.base_url, path,
458 data=data, headers=headers, params=params)
462 poolsize=self.poolsize,
463 connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
464 r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
465 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
466 r._token = headers['X-Auth-Token']
468 self.headers = dict()
471 if success is not None:
472 # Success can either be an int or a collection
473 success = (success,) if isinstance(success, int) else success
474 if r.status_code not in success:
475 self._raise_for_status(r)
478 def delete(self, path, **kwargs):
479 return self.request('delete', path, **kwargs)
481 def get(self, path, **kwargs):
482 return self.request('get', path, **kwargs)
484 def head(self, path, **kwargs):
485 return self.request('head', path, **kwargs)
487 def post(self, path, **kwargs):
488 return self.request('post', path, **kwargs)
490 def put(self, path, **kwargs):
491 return self.request('put', path, **kwargs)
493 def copy(self, path, **kwargs):
494 return self.request('copy', path, **kwargs)
496 def move(self, path, **kwargs):
497 return self.request('move', path, **kwargs)
500 class Waiter(object):
503 self, item_id, wait_status, get_status,
504 delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
505 """Wait while the item is still in wait_status or to reach it
507 :param server_id: integer (str or int)
509 :param wait_status: (str)
511 :param get_status: (method(self, item_id)) if called, returns
512 (status, progress %) If no way to tell progress, return None
514 :param delay: time interval between retries
516 :param wait_cb: (method(total steps)) returns a generator for
517 reporting progress or timeouts i.e., for a progress bar
519 :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
521 :returns: (str) the new mode if successful, (bool) False if timed out
523 status, progress = get_status(self, item_id)
526 wait_gen = wait_cb(max_wait // delay)
529 if wait_for_status ^ (status != wait_status):
536 old_wait = total_wait = 0
538 while (wait_for_status ^ (status == wait_status)) and (
539 total_wait <= max_wait):
542 for i in range(total_wait - old_wait):
546 old_wait = total_wait
547 total_wait = progress or total_wait + 1
549 status, progress = get_status(self, item_id)
551 if total_wait < max_wait:
554 for i in range(max_wait):
558 return status if (wait_for_status ^ (status != wait_status)) else False
561 self, item_id, target_status, get_status,
562 delay=1, max_wait=100, wait_cb=None):
564 item_id, target_status, get_status, delay, max_wait, wait_cb,
565 wait_for_status=True)
568 self, item_id, target_status, get_status,
569 delay=1, max_wait=100, wait_cb=None):
571 item_id, target_status, get_status, delay, max_wait, wait_cb,
572 wait_for_status=False)