98211ee3434d694224e512113e29d465e9ba86c9
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
43
44 from objpool.http import PooledHTTPConnection
45
46
47 TIMEOUT = 60.0   # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
53
54
55 def _encode(v):
56     if v and isinstance(v, unicode):
57         return quote(v.encode('utf-8'))
58     return v
59
60
61 class ClientError(Exception):
62     def __init__(self, message, status=0, details=None):
63         log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64             message,
65             status,
66             details))
67         try:
68             message += '' if message and message[-1] == '\n' else '\n'
69             serv_stat, sep, new_msg = message.partition('{')
70             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71             json_msg = loads(new_msg)
72             key = json_msg.keys()[0]
73             serv_stat = serv_stat.strip()
74
75             json_msg = json_msg[key]
76             message = '%s %s (%s)\n' % (
77                 serv_stat,
78                 key,
79                 json_msg['message']) if (
80                     'message' in json_msg) else '%s %s' % (serv_stat, key)
81             status = json_msg.get('code', status)
82             if 'details' in json_msg:
83                 if not details:
84                     details = []
85                 if not isinstance(details, list):
86                     details = [details]
87                 if json_msg['details']:
88                     details.append(json_msg['details'])
89         except Exception:
90             pass
91         finally:
92             while message.endswith('\n\n'):
93                 message = message[:-1]
94             super(ClientError, self).__init__(message)
95             self.status = status if isinstance(status, int) else 0
96             self.details = details if details else []
97
98
99 class Logged(object):
100
101     LOG_TOKEN = False
102     LOG_DATA = False
103     LOG_PID = False
104     _token = None
105
106
107 class RequestManager(Logged):
108     """Handle http request information"""
109
110     def _connection_info(self, url, path, params={}):
111         """ Set self.url to scheme://netloc/?params
112         :param url: (str or unicode) The service url
113
114         :param path: (str or unicode) The service path (url/path)
115
116         :param params: (dict) Parameters to add to final url
117
118         :returns: (scheme, netloc)
119         """
120         url = _encode(str(url)) if url else 'http://127.0.0.1/'
121         url += '' if url.endswith('/') else '/'
122         if path:
123             url += _encode(path[1:] if path.startswith('/') else path)
124         delim = '?'
125         for key, val in params.items():
126             val = '' if val in (None, False) else _encode('%s' % val)
127             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128             delim = '&'
129         parsed = urlparse(url)
130         self.url = url
131         self.path = parsed.path or '/'
132         if parsed.query:
133             self.path += '?%s' % parsed.query
134         return (parsed.scheme, parsed.netloc)
135
136     def __init__(
137             self, method, url, path,
138             data=None, headers={}, params={}):
139         method = method.upper()
140         assert method in HTTP_METHODS, 'Invalid http method %s' % method
141         if headers:
142             assert isinstance(headers, dict)
143         self.headers = dict(headers)
144         self.method, self.data = method, data
145         self.scheme, self.netloc = self._connection_info(url, path, params)
146
147     def dump_log(self):
148         plog = ('\t[%s]' % self) if self.LOG_PID else ''
149         sendlog.info('- -  -   -     -        -             -')
150         sendlog.info('%s %s://%s%s%s' % (
151             self.method, self.scheme, self.netloc, self.path, plog))
152         for key, val in self.headers.items():
153             if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154                 self._token, val = val, '...'
155             sendlog.info('  %s: %s%s' % (key, val, plog))
156         if self.data:
157             sendlog.info('data size:%s%s' % (len(self.data), plog))
158             if self.LOG_DATA:
159                 sendlog.info(self.data.replace(self._token, '...') if (
160                     self._token) else self.data)
161         else:
162             sendlog.info('data size:0%s' % plog)
163
164     def perform(self, conn):
165         """
166         :param conn: (httplib connection object)
167
168         :returns: (HTTPResponse)
169         """
170         self.dump_log()
171         conn.request(
172             method=str(self.method.upper()),
173             url=str(self.path),
174             headers=self.headers,
175             body=self.data)
176         sendlog.info('')
177         keep_trying = TIMEOUT
178         while keep_trying > 0:
179             try:
180                 return conn.getresponse()
181             except ResponseNotReady:
182                 wait = 0.03 * random()
183                 sleep(wait)
184                 keep_trying -= wait
185         plog = ('\t[%s]' % self) if self.LOG_PID else ''
186         logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187         recvlog.debug(logmsg)
188         raise ClientError('HTTPResponse takes too long - kamaki timeout')
189
190
191 class ResponseManager(Logged):
192     """Manage the http request and handle the response data, headers, etc."""
193
194     def __init__(self, request, poolsize=None, connection_retry_limit=0):
195         """
196         :param request: (RequestManager)
197
198         :param poolsize: (int) the size of the connection pool
199
200         :param connection_retry_limit: (int)
201         """
202         self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203         self.request = request
204         self._request_performed = False
205         self.poolsize = poolsize
206
207     def _get_response(self):
208         if self._request_performed:
209             return
210
211         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212         for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213             try:
214                 with PooledHTTPConnection(
215                         self.request.netloc, self.request.scheme,
216                         **pool_kw) as connection:
217                     self.request.LOG_TOKEN = self.LOG_TOKEN
218                     self.request.LOG_DATA = self.LOG_DATA
219                     self.request.LOG_PID = self.LOG_PID
220                     r = self.request.perform(connection)
221                     plog = ''
222                     if self.LOG_PID:
223                         recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224                             self, r, self.request))
225                         plog = '\t[%s]' % self
226                     self._request_performed = True
227                     self._status_code, self._status = r.status, unquote(
228                         r.reason)
229                     recvlog.info(
230                         '%d %s%s' % (
231                             self.status_code, self.status, plog))
232                     self._headers = dict()
233                     for k, v in r.getheaders():
234                         if k.lower in ('x-auth-token', ) and (
235                                 not self.LOG_TOKEN):
236                             self._token, v = v, '...'
237                         v = unquote(v)
238                         self._headers[k] = v
239                         recvlog.info('  %s: %s%s' % (k, v, plog))
240                     self._content = r.read()
241                     recvlog.info('data size: %s%s' % (
242                         len(self._content) if self._content else 0, plog))
243                     if self.LOG_DATA and self._content:
244                         data = '%s%s' % (self._content, plog)
245                         if self._token:
246                             data = data.replace(self._token, '...')
247                         recvlog.info(data)
248                     recvlog.info('-             -        -     -   -  - -')
249                 break
250             except Exception as err:
251                 if isinstance(err, HTTPException):
252                     if retries >= self.CONNECTION_TRY_LIMIT:
253                         raise ClientError(
254                             'Connection to %s failed %s times (%s: %s )' % (
255                                 self.request.url, retries, type(err), err))
256                 else:
257                     from traceback import format_stack
258                     recvlog.debug(
259                         '\n'.join(['%s' % type(err)] + format_stack()))
260                     raise ClientError(
261                         'Failed while http-connecting to %s (%s)' % (
262                             self.request.url, err))
263
264     @property
265     def status_code(self):
266         self._get_response()
267         return self._status_code
268
269     @property
270     def status(self):
271         self._get_response()
272         return self._status
273
274     @property
275     def headers(self):
276         self._get_response()
277         return self._headers
278
279     @property
280     def content(self):
281         self._get_response()
282         return self._content
283
284     @property
285     def text(self):
286         """
287         :returns: (str) content
288         """
289         self._get_response()
290         return '%s' % self._content
291
292     @property
293     def json(self):
294         """
295         :returns: (dict) squeezed from json-formated content
296         """
297         self._get_response()
298         try:
299             return loads(self._content)
300         except ValueError as err:
301             raise ClientError('Response not formated in JSON - %s' % err)
302
303
304 class SilentEvent(Thread):
305     """Thread-run method(*args, **kwargs)"""
306     def __init__(self, method, *args, **kwargs):
307         super(self.__class__, self).__init__()
308         self.method = method
309         self.args = args
310         self.kwargs = kwargs
311
312     @property
313     def exception(self):
314         return getattr(self, '_exception', False)
315
316     @property
317     def value(self):
318         return getattr(self, '_value', None)
319
320     def run(self):
321         try:
322             self._value = self.method(*(self.args), **(self.kwargs))
323         except Exception as e:
324             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325                 self,
326                 type(e),
327                 e.status if isinstance(e, ClientError) else '',
328                 e))
329             self._exception = e
330
331
332 class Client(Logged):
333
334     MAX_THREADS = 1
335     DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336     CONNECTION_RETRY_LIMIT = 0
337
338     def __init__(self, base_url, token):
339         assert base_url, 'No base_url for client %s' % self
340         self.base_url = base_url
341         self.token = token
342         self.headers, self.params = dict(), dict()
343         self.poolsize = None
344
345     def _init_thread_limit(self, limit=1):
346         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
347         self._thread_limit = limit
348         self._elapsed_old = 0.0
349         self._elapsed_new = 0.0
350
351     def _watch_thread_limit(self, threadlist):
352         self._thread_limit = getattr(self, '_thread_limit', 1)
353         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
354         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
355         recvlog.debug('# running threads: %s' % len(threadlist))
356         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
357                 self._thread_limit < self.MAX_THREADS):
358             self._thread_limit += 1
359         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
360             self._thread_limit -= 1
361
362         self._elapsed_old = self._elapsed_new
363         if len(threadlist) >= self._thread_limit:
364             self._elapsed_new = 0.0
365             for thread in threadlist:
366                 begin_time = time()
367                 thread.join()
368                 self._elapsed_new += time() - begin_time
369             self._elapsed_new = self._elapsed_new / len(threadlist)
370             return []
371         return threadlist
372
373     def async_run(self, method, kwarg_list):
374         """Fire threads of operations
375
376         :param method: the method to run in each thread
377
378         :param kwarg_list: (list of dicts) the arguments to pass in each method
379             call
380
381         :returns: (list) the results of each method call w.r. to the order of
382             kwarg_list
383         """
384         flying, results = {}, {}
385         self._init_thread_limit()
386         for index, kwargs in enumerate(kwarg_list):
387             self._watch_thread_limit(flying.values())
388             flying[index] = SilentEvent(method=method, **kwargs)
389             flying[index].start()
390             unfinished = {}
391             for key, thread in flying.items():
392                 if thread.isAlive():
393                     unfinished[key] = thread
394                 elif thread.exception:
395                     raise thread.exception
396                 else:
397                     results[key] = thread.value
398             flying = unfinished
399         sendlog.info('- - - wait for threads to finish')
400         for key, thread in flying.items():
401             if thread.isAlive():
402                 thread.join()
403             if thread.exception:
404                 raise thread.exception
405             results[key] = thread.value
406         return results.values()
407
408     def _raise_for_status(self, r):
409         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
410         status_msg = getattr(r, 'status', None) or ''
411         try:
412             message = '%s %s\n' % (status_msg, r.text)
413         except:
414             message = '%s %s\n' % (status_msg, r)
415         status = getattr(r, 'status_code', getattr(r, 'status', 0))
416         raise ClientError(message, status=status)
417
418     def set_header(self, name, value, iff=True):
419         """Set a header 'name':'value'"""
420         if value is not None and iff:
421             self.headers[name] = unicode(value)
422
423     def set_param(self, name, value=None, iff=True):
424         if iff:
425             self.params[name] = '%s' % value  # unicode(value)
426
427     def request(
428             self, method, path,
429             async_headers=dict(), async_params=dict(),
430             **kwargs):
431         """Commit an HTTP request to base_url/path
432         Requests are commited to and performed by Request/ResponseManager
433         These classes perform a lazy http request. Present method, by default,
434         enforces them to perform the http call. Hint: call present method with
435         success=None to get a non-performed ResponseManager object.
436         """
437         assert isinstance(method, str) or isinstance(method, unicode)
438         assert method
439         assert isinstance(path, str) or isinstance(path, unicode)
440         try:
441             headers = dict(self.headers)
442             headers.update(async_headers)
443             params = dict(self.params)
444             params.update(async_params)
445             success = kwargs.pop('success', 200)
446             data = kwargs.pop('data', None)
447             headers.setdefault('X-Auth-Token', self.token)
448             if 'json' in kwargs:
449                 data = dumps(kwargs.pop('json'))
450                 headers.setdefault('Content-Type', 'application/json')
451             if data:
452                 headers.setdefault('Content-Length', '%s' % len(data))
453
454             plog = ('\t[%s]' % self) if self.LOG_PID else ''
455             sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
456             req = RequestManager(
457                 method, self.base_url, path,
458                 data=data, headers=headers, params=params)
459             #  req.log()
460             r = ResponseManager(
461                 req,
462                 poolsize=self.poolsize,
463                 connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
464             r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
465                 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
466             r._token = headers['X-Auth-Token']
467         finally:
468             self.headers = dict()
469             self.params = dict()
470
471         if success is not None:
472             # Success can either be an int or a collection
473             success = (success,) if isinstance(success, int) else success
474             if r.status_code not in success:
475                 self._raise_for_status(r)
476         return r
477
478     def delete(self, path, **kwargs):
479         return self.request('delete', path, **kwargs)
480
481     def get(self, path, **kwargs):
482         return self.request('get', path, **kwargs)
483
484     def head(self, path, **kwargs):
485         return self.request('head', path, **kwargs)
486
487     def post(self, path, **kwargs):
488         return self.request('post', path, **kwargs)
489
490     def put(self, path, **kwargs):
491         return self.request('put', path, **kwargs)
492
493     def copy(self, path, **kwargs):
494         return self.request('copy', path, **kwargs)
495
496     def move(self, path, **kwargs):
497         return self.request('move', path, **kwargs)
498
499
500 class Waiter(object):
501
502     def _wait(
503             self, item_id, wait_status, get_status,
504             delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
505         """Wait while the item is still in wait_status or to reach it
506
507         :param server_id: integer (str or int)
508
509         :param wait_status: (str)
510
511         :param get_status: (method(self, item_id)) if called, returns
512             (status, progress %) If no way to tell progress, return None
513
514         :param delay: time interval between retries
515
516         :param wait_cb: (method(total steps)) returns a generator for
517             reporting progress or timeouts i.e., for a progress bar
518
519         :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
520
521         :returns: (str) the new mode if successful, (bool) False if timed out
522         """
523         status, progress = get_status(self, item_id)
524
525         if wait_cb:
526             wait_gen = wait_cb(max_wait // delay)
527             wait_gen.next()
528
529         if wait_for_status ^ (status != wait_status):
530             # if wait_cb:
531             #     try:
532             #         wait_gen.next()
533             #     except Exception:
534             #         pass
535             return status
536         old_wait = total_wait = 0
537
538         while (wait_for_status ^ (status == wait_status)) and (
539                 total_wait <= max_wait):
540             if wait_cb:
541                 try:
542                     for i in range(total_wait - old_wait):
543                         wait_gen.next()
544                 except Exception:
545                     break
546             old_wait = total_wait
547             total_wait = progress or total_wait + 1
548             sleep(delay)
549             status, progress = get_status(self, item_id)
550
551         if total_wait < max_wait:
552             if wait_cb:
553                 try:
554                     for i in range(max_wait):
555                         wait_gen.next()
556                 except:
557                     pass
558         return status if (wait_for_status ^ (status != wait_status)) else False
559
560     def wait_for(
561             self, item_id, target_status, get_status,
562             delay=1, max_wait=100, wait_cb=None):
563         self._wait(
564             item_id, target_status, get_status, delay, max_wait, wait_cb,
565             wait_for_status=True)
566
567     def wait_while(
568             self, item_id, target_status, get_status,
569             delay=1, max_wait=100, wait_cb=None):
570         self._wait(
571             item_id, target_status, get_status, delay, max_wait, wait_cb,
572             wait_for_status=False)