Fix details that ruin internal unicode objects
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
43
44 from objpool.http import PooledHTTPConnection
45
46
47 TIMEOUT = 60.0   # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
53
54
55 def _encode(v):
56     if v and isinstance(v, unicode):
57         return quote(v.encode('utf-8'))
58     return v
59
60
61 class ClientError(Exception):
62     def __init__(self, message, status=0, details=None):
63         log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64             message,
65             status,
66             details))
67         try:
68             message += '' if message and message[-1] == '\n' else '\n'
69             serv_stat, sep, new_msg = message.partition('{')
70             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71             json_msg = loads(new_msg)
72             key = json_msg.keys()[0]
73             serv_stat = serv_stat.strip()
74
75             json_msg = json_msg[key]
76             message = '%s %s (%s)\n' % (
77                 serv_stat,
78                 key,
79                 json_msg['message']) if (
80                     'message' in json_msg) else '%s %s' % (serv_stat, key)
81             status = json_msg.get('code', status)
82             if 'details' in json_msg:
83                 if not details:
84                     details = []
85                 if not isinstance(details, list):
86                     details = [details]
87                 if json_msg['details']:
88                     details.append(json_msg['details'])
89         except Exception:
90             pass
91         finally:
92             while message.endswith('\n\n'):
93                 message = message[:-1]
94             super(ClientError, self).__init__(message)
95             self.status = status if isinstance(status, int) else 0
96             self.details = details if details else []
97
98
99 class Logged(object):
100
101     LOG_TOKEN = False
102     LOG_DATA = False
103     LOG_PID = False
104     _token = None
105
106
107 class RequestManager(Logged):
108     """Handle http request information"""
109
110     def _connection_info(self, url, path, params={}):
111         """ Set self.url to scheme://netloc/?params
112         :param url: (str or unicode) The service url
113
114         :param path: (str or unicode) The service path (url/path)
115
116         :param params: (dict) Parameters to add to final url
117
118         :returns: (scheme, netloc)
119         """
120         url = url or 'http://127.0.0.1/'
121         url += '' if url.endswith('/') else '/'
122         if path:
123             url += _encode(path[1:] if path.startswith('/') else path)
124         delim = '?'
125         for key, val in params.items():
126             val = quote('' if val in (None, False) else '%s' % _encode(val))
127             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128             delim = '&'
129         parsed = urlparse(url)
130         self.url = '%s' % url
131         self.path = (('%s' % parsed.path) if parsed.path else '/') + (
132             '?%s' % parsed.query if parsed.query else '')
133         return (parsed.scheme, parsed.netloc)
134
135     def __init__(
136             self, method, url, path,
137             data=None, headers={}, params={}):
138         method = method.upper()
139         assert method in HTTP_METHODS, 'Invalid http method %s' % method
140         if headers:
141             assert isinstance(headers, dict)
142         self.headers = dict(headers)
143         self.method, self.data = method, data
144         self.scheme, self.netloc = self._connection_info(url, path, params)
145
146     def dump_log(self):
147         plog = ('\t[%s]' % self) if self.LOG_PID else ''
148         sendlog.info('- -  -   -     -        -             -')
149         sendlog.info('%s %s://%s%s%s' % (
150             self.method, self.scheme, self.netloc, self.path, plog))
151         for key, val in self.headers.items():
152             if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
153                 self._token, val = val, '...'
154             sendlog.info('  %s: %s%s' % (key, val, plog))
155         if self.data:
156             sendlog.info('data size: %s%s' % (len(self.data), plog))
157             if self.LOG_DATA:
158                 sendlog.info(self.data.replace(self._token, '...') if (
159                     self._token) else self.data)
160         else:
161             sendlog.info('data size: 0%s' % plog)
162
163     def _encode_headers(self):
164         headers = dict(self.headers)
165         for k, v in self.headers.items():
166             headers[k] = quote(
167                 v.encode('utf-8') if isinstance(v, unicode) else v)
168         self.headers = headers
169
170     def perform(self, conn):
171         """
172         :param conn: (httplib connection object)
173
174         :returns: (HTTPResponse)
175         """
176         self._encode_headers()
177         self.dump_log()
178         conn.request(
179             method=self.method.upper(),
180             url=self.path.encode('utf-8'),
181             headers=self.headers,
182             body=self.data)
183         sendlog.info('')
184         keep_trying = TIMEOUT
185         while keep_trying > 0:
186             try:
187                 return conn.getresponse()
188             except ResponseNotReady:
189                 wait = 0.03 * random()
190                 sleep(wait)
191                 keep_trying -= wait
192         plog = ('\t[%s]' % self) if self.LOG_PID else ''
193         logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194         recvlog.debug(logmsg)
195         raise ClientError('HTTPResponse takes too long - kamaki timeout')
196
197
198 class ResponseManager(Logged):
199     """Manage the http request and handle the response data, headers, etc."""
200
201     def __init__(self, request, poolsize=None, connection_retry_limit=0):
202         """
203         :param request: (RequestManager)
204
205         :param poolsize: (int) the size of the connection pool
206
207         :param connection_retry_limit: (int)
208         """
209         self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210         self.request = request
211         self._request_performed = False
212         self.poolsize = poolsize
213         self._headers_to_decode, self._header_prefices = [], []
214
215     def _get_headers_to_decode(self, headers):
216         keys = set([k.lower() for k, v in headers])
217         encodable = list(keys.intersection(self.headers_to_decode))
218
219         def has_prefix(s):
220             for k in self.header_prefices:
221                 if s.startswith(k):
222                     return True
223             return False
224         return encodable + filter(has_prefix, keys.difference(encodable))
225
226     def _get_response(self):
227         if self._request_performed:
228             return
229
230         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
231         for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
232             try:
233                 with PooledHTTPConnection(
234                         self.request.netloc, self.request.scheme,
235                         **pool_kw) as connection:
236                     self.request.LOG_TOKEN = self.LOG_TOKEN
237                     self.request.LOG_DATA = self.LOG_DATA
238                     self.request.LOG_PID = self.LOG_PID
239                     r = self.request.perform(connection)
240                     plog = ''
241                     if self.LOG_PID:
242                         recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
243                             self, r, self.request))
244                         plog = '\t[%s]' % self
245                     self._request_performed = True
246                     self._status_code, self._status = r.status, unquote(
247                         r.reason)
248                     recvlog.info(
249                         '%d %s%s' % (
250                             self.status_code, self.status, plog))
251                     self._headers = dict()
252
253                     r_headers = r.getheaders()
254                     enc_headers = self._get_headers_to_decode(r_headers)
255                     for k, v in r_headers:
256                         self._headers[k] = unquote(v).decode('utf-8') if (
257                             k.lower()) in enc_headers else v
258                         recvlog.info('  %s: %s%s' % (k, v, plog))
259                     self._content = r.read()
260                     recvlog.info('data size: %s%s' % (
261                         len(self._content) if self._content else 0, plog))
262                     if self.LOG_DATA and self._content:
263                         data = '%s%s' % (self._content, plog)
264                         if self._token:
265                             data = data.replace(self._token, '...')
266                         recvlog.info(data)
267                     recvlog.info('-             -        -     -   -  - -')
268                 break
269             except Exception as err:
270                 if isinstance(err, HTTPException):
271                     if retries >= self.CONNECTION_TRY_LIMIT:
272                         raise ClientError(
273                             'Connection to %s failed %s times (%s: %s )' % (
274                                 self.request.url, retries, type(err), err))
275                 else:
276                     from traceback import format_stack
277                     recvlog.debug(
278                         '\n'.join(['%s' % type(err)] + format_stack()))
279                     raise
280
281     @property
282     def status_code(self):
283         self._get_response()
284         return self._status_code
285
286     @property
287     def status(self):
288         self._get_response()
289         return self._status
290
291     @property
292     def headers(self):
293         self._get_response()
294         return self._headers
295
296     @property
297     def content(self):
298         self._get_response()
299         return self._content
300
301     @property
302     def text(self):
303         """
304         :returns: (str) content
305         """
306         self._get_response()
307         return '%s' % self._content
308
309     @property
310     def headers_to_decode(self):
311         return self._headers_to_decode
312
313     @headers_to_decode.setter
314     def headers_to_decode(self, header_keys):
315         self._headers_to_decode += [k.lower() for k in header_keys]
316         self._headers_to_decode = list(set(self._headers_to_decode))
317
318     @property
319     def header_prefices(self):
320         return self._header_prefices
321
322     @header_prefices.setter
323     def header_prefices(self, header_key_prefices):
324         self._header_prefices += [p.lower() for p in header_key_prefices]
325         self._header_prefices = list(set(self._header_prefices))
326
327     @property
328     def json(self):
329         """
330         :returns: (dict) squeezed from json-formated content
331         """
332         self._get_response()
333         try:
334             return loads(self._content)
335         except ValueError as err:
336             raise ClientError('Response not formated in JSON - %s' % err)
337
338
339 class SilentEvent(Thread):
340     """Thread-run method(*args, **kwargs)"""
341     def __init__(self, method, *args, **kwargs):
342         super(self.__class__, self).__init__()
343         self.method, self.args, self.kwargs = method, args, kwargs
344
345     @property
346     def exception(self):
347         return getattr(self, '_exception', False)
348
349     @property
350     def value(self):
351         return getattr(self, '_value', None)
352
353     def run(self):
354         try:
355             self._value = self.method(*(self.args), **(self.kwargs))
356         except Exception as e:
357             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
358                 self,
359                 type(e),
360                 e.status if isinstance(e, ClientError) else '',
361                 e))
362             self._exception = e
363
364
365 class Client(Logged):
366
367     MAX_THREADS = 1
368     DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
369     CONNECTION_RETRY_LIMIT = 0
370
371     def __init__(self, base_url, token):
372         assert base_url, 'No base_url for client %s' % self
373         self.base_url = base_url
374         self.token = token
375         self.headers, self.params = dict(), dict()
376         self.poolsize = None
377         self.response_headers = []
378         self.response_header_prefices = []
379
380     def _init_thread_limit(self, limit=1):
381         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
382         self._thread_limit = limit
383         self._elapsed_old = 0.0
384         self._elapsed_new = 0.0
385
386     def _watch_thread_limit(self, threadlist):
387         self._thread_limit = getattr(self, '_thread_limit', 1)
388         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
389         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
390         recvlog.debug('# running threads: %s' % len(threadlist))
391         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
392                 self._thread_limit < self.MAX_THREADS):
393             self._thread_limit += 1
394         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
395             self._thread_limit -= 1
396
397         self._elapsed_old = self._elapsed_new
398         if len(threadlist) >= self._thread_limit:
399             self._elapsed_new = 0.0
400             for thread in threadlist:
401                 begin_time = time()
402                 thread.join()
403                 self._elapsed_new += time() - begin_time
404             self._elapsed_new = self._elapsed_new / len(threadlist)
405             return []
406         return threadlist
407
408     def async_run(self, method, kwarg_list):
409         """Fire threads of operations
410
411         :param method: the method to run in each thread
412
413         :param kwarg_list: (list of dicts) the arguments to pass in each method
414             call
415
416         :returns: (list) the results of each method call w.r. to the order of
417             kwarg_list
418         """
419         flying, results = {}, {}
420         self._init_thread_limit()
421         for index, kwargs in enumerate(kwarg_list):
422             self._watch_thread_limit(flying.values())
423             flying[index] = SilentEvent(method=method, **kwargs)
424             flying[index].start()
425             unfinished = {}
426             for key, thread in flying.items():
427                 if thread.isAlive():
428                     unfinished[key] = thread
429                 elif thread.exception:
430                     raise thread.exception
431                 else:
432                     results[key] = thread.value
433             flying = unfinished
434         sendlog.info('- - - wait for threads to finish')
435         for key, thread in flying.items():
436             if thread.isAlive():
437                 thread.join()
438             if thread.exception:
439                 raise thread.exception
440             results[key] = thread.value
441         return results.values()
442
443     def _raise_for_status(self, r):
444         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
445         status_msg = getattr(r, 'status', None) or ''
446         try:
447             message = '%s %s\n' % (status_msg, r.text)
448         except:
449             message = '%s %s\n' % (status_msg, r)
450         status = getattr(r, 'status_code', getattr(r, 'status', 0))
451         raise ClientError(message, status=status)
452
453     def set_header(self, name, value, iff=True):
454         """Set a header 'name':'value'"""
455         if value is not None and iff:
456             self.headers['%s' % name] = '%s' % value
457
458     def set_param(self, name, value=None, iff=True):
459         if iff:
460             self.params[name] = '%s' % value
461
462     def request(
463             self, method, path,
464             async_headers=dict(), async_params=dict(),
465             **kwargs):
466         """Commit an HTTP request to base_url/path
467         Requests are commited to and performed by Request/ResponseManager
468         These classes perform a lazy http request. Present method, by default,
469         enforces them to perform the http call. Hint: call present method with
470         success=None to get a non-performed ResponseManager object.
471         """
472         assert isinstance(method, str) or isinstance(method, unicode)
473         assert method
474         assert isinstance(path, str) or isinstance(path, unicode)
475         try:
476             headers = dict(self.headers)
477             headers.update(async_headers)
478             params = dict(self.params)
479             params.update(async_params)
480             success = kwargs.pop('success', 200)
481             data = kwargs.pop('data', None)
482             headers.setdefault('X-Auth-Token', self.token)
483             if 'json' in kwargs:
484                 data = dumps(kwargs.pop('json'))
485                 headers.setdefault('Content-Type', 'application/json')
486             if data:
487                 headers.setdefault('Content-Length', '%s' % len(data))
488             plog = ('\t[%s]' % self) if self.LOG_PID else ''
489             sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
490             req = RequestManager(
491                 method, self.base_url, path,
492                 data=data, headers=headers, params=params)
493             #  req.log()
494             r = ResponseManager(
495                 req,
496                 poolsize=self.poolsize,
497                 connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
498             r.headers_to_decode = self.response_headers
499             r.header_prefices = self.response_header_prefices
500             r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
501                 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
502             r._token = headers['X-Auth-Token']
503         finally:
504             self.headers = dict()
505             self.params = dict()
506
507         if success is not None:
508             # Success can either be an int or a collection
509             success = (success,) if isinstance(success, int) else success
510             if r.status_code not in success:
511                 self._raise_for_status(r)
512         return r
513
514     def delete(self, path, **kwargs):
515         return self.request('delete', path, **kwargs)
516
517     def get(self, path, **kwargs):
518         return self.request('get', path, **kwargs)
519
520     def head(self, path, **kwargs):
521         return self.request('head', path, **kwargs)
522
523     def post(self, path, **kwargs):
524         return self.request('post', path, **kwargs)
525
526     def put(self, path, **kwargs):
527         return self.request('put', path, **kwargs)
528
529     def copy(self, path, **kwargs):
530         return self.request('copy', path, **kwargs)
531
532     def move(self, path, **kwargs):
533         return self.request('move', path, **kwargs)
534
535
536 class Waiter(object):
537
538     def _wait(
539             self, item_id, wait_status, get_status,
540             delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
541         """Wait while the item is still in wait_status or to reach it
542
543         :param server_id: integer (str or int)
544
545         :param wait_status: (str)
546
547         :param get_status: (method(self, item_id)) if called, returns
548             (status, progress %) If no way to tell progress, return None
549
550         :param delay: time interval between retries
551
552         :param wait_cb: (method(total steps)) returns a generator for
553             reporting progress or timeouts i.e., for a progress bar
554
555         :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
556
557         :returns: (str) the new mode if successful, (bool) False if timed out
558         """
559         status, progress = get_status(self, item_id)
560
561         if wait_cb:
562             wait_gen = wait_cb(max_wait // delay)
563             wait_gen.next()
564
565         if wait_for_status ^ (status != wait_status):
566             # if wait_cb:
567             #     try:
568             #         wait_gen.next()
569             #     except Exception:
570             #         pass
571             return status
572         old_wait = total_wait = 0
573
574         while (wait_for_status ^ (status == wait_status)) and (
575                 total_wait <= max_wait):
576             if wait_cb:
577                 try:
578                     for i in range(total_wait - old_wait):
579                         wait_gen.next()
580                 except Exception:
581                     break
582             old_wait = total_wait
583             total_wait = progress or total_wait + 1
584             sleep(delay)
585             status, progress = get_status(self, item_id)
586
587         if total_wait < max_wait:
588             if wait_cb:
589                 try:
590                     for i in range(max_wait):
591                         wait_gen.next()
592                 except:
593                     pass
594         return status if (wait_for_status ^ (status != wait_status)) else False
595
596     def wait_for(
597             self, item_id, target_status, get_status,
598             delay=1, max_wait=100, wait_cb=None):
599         self._wait(
600             item_id, target_status, get_status, delay, max_wait, wait_cb,
601             wait_for_status=True)
602
603     def wait_while(
604             self, item_id, target_status, get_status,
605             delay=1, max_wait=100, wait_cb=None):
606         self._wait(
607             item_id, target_status, get_status, delay, max_wait, wait_cb,
608             wait_for_status=False)