2415abdf6450322030fef1535d8d4edf074cb270
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
42 from logging import getLogger
43
44 from objpool.http import PooledHTTPConnection
45
46
47 TIMEOUT = 60.0   # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
53
54
55 def _encode(v):
56     if v and isinstance(v, unicode):
57         return quote(v.encode('utf-8'))
58     return v
59
60
61 class ClientError(Exception):
62     def __init__(self, message, status=0, details=None):
63         log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64             message,
65             status,
66             details))
67         try:
68             message += '' if message and message[-1] == '\n' else '\n'
69             serv_stat, sep, new_msg = message.partition('{')
70             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71             json_msg = loads(new_msg)
72             key = json_msg.keys()[0]
73             serv_stat = serv_stat.strip()
74
75             json_msg = json_msg[key]
76             message = '%s %s (%s)\n' % (
77                 serv_stat,
78                 key,
79                 json_msg['message']) if (
80                     'message' in json_msg) else '%s %s' % (serv_stat, key)
81             status = json_msg.get('code', status)
82             if 'details' in json_msg:
83                 if not details:
84                     details = []
85                 if not isinstance(details, list):
86                     details = [details]
87                 if json_msg['details']:
88                     details.append(json_msg['details'])
89         except Exception:
90             pass
91         finally:
92             while message.endswith('\n\n'):
93                 message = message[:-1]
94             super(ClientError, self).__init__(message)
95             self.status = status if isinstance(status, int) else 0
96             self.details = details if details else []
97
98
99 class Logged(object):
100
101     LOG_TOKEN = False
102     LOG_DATA = False
103
104
105 class RequestManager(Logged):
106     """Handle http request information"""
107
108     def _connection_info(self, url, path, params={}):
109         """ Set self.url to scheme://netloc/?params
110         :param url: (str or unicode) The service url
111
112         :param path: (str or unicode) The service path (url/path)
113
114         :param params: (dict) Parameters to add to final url
115
116         :returns: (scheme, netloc)
117         """
118         url = _encode(str(url)) if url else 'http://127.0.0.1/'
119         url += '' if url.endswith('/') else '/'
120         if path:
121             url += _encode(path[1:] if path.startswith('/') else path)
122         delim = '?'
123         for key, val in params.items():
124             val = _encode(val)
125             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126             delim = '&'
127         parsed = urlparse(url)
128         self.url = url
129         self.path = parsed.path or '/'
130         if parsed.query:
131             self.path += '?%s' % parsed.query
132         return (parsed.scheme, parsed.netloc)
133
134     def __init__(
135             self, method, url, path,
136             data=None, headers={}, params={}):
137         method = method.upper()
138         assert method in HTTP_METHODS, 'Invalid http method %s' % method
139         if headers:
140             assert isinstance(headers, dict)
141         self.headers = dict(headers)
142         self.method, self.data = method, data
143         self.scheme, self.netloc = self._connection_info(url, path, params)
144
145     def dump_log(self):
146         sendlog.info('%s %s://%s%s\t[%s]' % (
147             self.method, self.scheme, self.netloc, self.path, self))
148         for key, val in self.headers.items():
149             if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
150                 continue
151             sendlog.info('  %s: %s\t[%s]' % (key, val, self))
152         if self.data:
153             sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
154             if self.LOG_DATA:
155                 sendlog.info(self.data)
156         else:
157             sendlog.info('data size:0\t[%s]' % self)
158         sendlog.info('')
159
160     def perform(self, conn):
161         """
162         :param conn: (httplib connection object)
163
164         :returns: (HTTPResponse)
165         """
166         conn.request(
167             method=str(self.method.upper()),
168             url=str(self.path),
169             headers=self.headers,
170             body=self.data)
171         self.dump_log()
172         keep_trying = TIMEOUT
173         while keep_trying > 0:
174             try:
175                 return conn.getresponse()
176             except ResponseNotReady:
177                 wait = 0.03 * random()
178                 sleep(wait)
179                 keep_trying -= wait
180         logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
181         recvlog.debug(logmsg)
182         raise ClientError('HTTPResponse takes too long - kamaki timeout')
183
184
185 class ResponseManager(Logged):
186     """Manage the http request and handle the response data, headers, etc."""
187
188     def __init__(self, request, poolsize=None):
189         """
190         :param request: (RequestManager)
191         """
192         self.request = request
193         self._request_performed = False
194         self.poolsize = poolsize
195
196     def _get_response(self):
197         if self._request_performed:
198             return
199
200         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
201         try:
202             with PooledHTTPConnection(
203                     self.request.netloc, self.request.scheme,
204                     **pool_kw) as connection:
205                 self.request.LOG_TOKEN = self.LOG_TOKEN
206                 self.request.LOG_DATA = self.LOG_DATA
207                 r = self.request.perform(connection)
208                 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
209                     self, r, self.request))
210                 self._request_performed = True
211                 self._status_code, self._status = r.status, unquote(r.reason)
212                 recvlog.info(
213                     '%d %s\t[p: %s]' % (self.status_code, self.status, self))
214                 self._headers = dict()
215                 for k, v in r.getheaders():
216                     if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
217                         continue
218                     v = unquote(v)
219                     self._headers[k] = v
220                     recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
221                 self._content = r.read()
222                 recvlog.info('data size: %s\t[p: %s]' % (
223                     len(self._content) if self._content else 0,
224                     self))
225                 if self.LOG_DATA and self._content:
226                     recvlog.info('%s\t[p: %s]' % (self._content, self))
227         except Exception as err:
228             from traceback import format_stack
229             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
230             raise ClientError(
231                 'Failed while http-connecting to %s (%s)' % (
232                     self.request.url,
233                     err))
234
235     @property
236     def status_code(self):
237         self._get_response()
238         return self._status_code
239
240     @property
241     def status(self):
242         self._get_response()
243         return self._status
244
245     @property
246     def headers(self):
247         self._get_response()
248         return self._headers
249
250     @property
251     def content(self):
252         self._get_response()
253         return self._content
254
255     @property
256     def text(self):
257         """
258         :returns: (str) content
259         """
260         self._get_response()
261         return '%s' % self._content
262
263     @property
264     def json(self):
265         """
266         :returns: (dict) squeezed from json-formated content
267         """
268         self._get_response()
269         try:
270             return loads(self._content)
271         except ValueError as err:
272             raise ClientError('Response not formated in JSON - %s' % err)
273
274
275 class SilentEvent(Thread):
276     """Thread-run method(*args, **kwargs)"""
277     def __init__(self, method, *args, **kwargs):
278         super(self.__class__, self).__init__()
279         self.method = method
280         self.args = args
281         self.kwargs = kwargs
282
283     @property
284     def exception(self):
285         return getattr(self, '_exception', False)
286
287     @property
288     def value(self):
289         return getattr(self, '_value', None)
290
291     def run(self):
292         try:
293             self._value = self.method(*(self.args), **(self.kwargs))
294         except Exception as e:
295             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
296                 self,
297                 type(e),
298                 e.status if isinstance(e, ClientError) else '',
299                 e))
300             self._exception = e
301
302
303 class Client(object):
304
305     MAX_THREADS = 7
306     DATE_FORMATS = [
307         '%a %b %d %H:%M:%S %Y',
308         '%A, %d-%b-%y %H:%M:%S GMT',
309         '%a, %d %b %Y %H:%M:%S GMT']
310     LOG_TOKEN = False
311     LOG_DATA = False
312
313     def __init__(self, base_url, token):
314         assert base_url, 'No base_url for client %s' % self
315         self.base_url = base_url
316         self.token = token
317         self.headers, self.params = dict(), dict()
318
319     def _init_thread_limit(self, limit=1):
320         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
321         self._thread_limit = limit
322         self._elapsed_old = 0.0
323         self._elapsed_new = 0.0
324
325     def _watch_thread_limit(self, threadlist):
326         self._thread_limit = getattr(self, '_thread_limit', 1)
327         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
328         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
329         recvlog.debug('# running threads: %s' % len(threadlist))
330         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
331                 self._thread_limit < self.MAX_THREADS):
332             self._thread_limit += 1
333         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
334             self._thread_limit -= 1
335
336         self._elapsed_old = self._elapsed_new
337         if len(threadlist) >= self._thread_limit:
338             self._elapsed_new = 0.0
339             for thread in threadlist:
340                 begin_time = time()
341                 thread.join()
342                 self._elapsed_new += time() - begin_time
343             self._elapsed_new = self._elapsed_new / len(threadlist)
344             return []
345         return threadlist
346
347     def _raise_for_status(self, r):
348         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
349         status_msg = getattr(r, 'status', None) or ''
350         try:
351             message = '%s %s\n' % (status_msg, r.text)
352         except:
353             message = '%s %s\n' % (status_msg, r)
354         status = getattr(r, 'status_code', getattr(r, 'status', 0))
355         raise ClientError(message, status=status)
356
357     def set_header(self, name, value, iff=True):
358         """Set a header 'name':'value'"""
359         if value is not None and iff:
360             self.headers[name] = value
361
362     def set_param(self, name, value=None, iff=True):
363         if iff:
364             self.params[name] = value
365
366     def request(
367             self, method, path,
368             async_headers=dict(), async_params=dict(),
369             **kwargs):
370         """Commit an HTTP request to base_url/path
371         Requests are commited to and performed by Request/ResponseManager
372         These classes perform a lazy http request. Present method, by default,
373         enforces them to perform the http call. Hint: call present method with
374         success=None to get a non-performed ResponseManager object.
375         """
376         assert isinstance(method, str) or isinstance(method, unicode)
377         assert method
378         assert isinstance(path, str) or isinstance(path, unicode)
379         try:
380             headers = dict(self.headers)
381             headers.update(async_headers)
382             params = dict(self.params)
383             params.update(async_params)
384             success = kwargs.pop('success', 200)
385             data = kwargs.pop('data', None)
386             headers.setdefault('X-Auth-Token', self.token)
387             if 'json' in kwargs:
388                 data = dumps(kwargs.pop('json'))
389                 headers.setdefault('Content-Type', 'application/json')
390             if data:
391                 headers.setdefault('Content-Length', '%s' % len(data))
392
393             sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
394             req = RequestManager(
395                 method, self.base_url, path,
396                 data=data, headers=headers, params=params)
397             #  req.log()
398             r = ResponseManager(req)
399             r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
400         finally:
401             self.headers = dict()
402             self.params = dict()
403
404         if success is not None:
405             # Success can either be an int or a collection
406             success = (success,) if isinstance(success, int) else success
407             if r.status_code not in success:
408                 self._raise_for_status(r)
409         return r
410
411     def delete(self, path, **kwargs):
412         return self.request('delete', path, **kwargs)
413
414     def get(self, path, **kwargs):
415         return self.request('get', path, **kwargs)
416
417     def head(self, path, **kwargs):
418         return self.request('head', path, **kwargs)
419
420     def post(self, path, **kwargs):
421         return self.request('post', path, **kwargs)
422
423     def put(self, path, **kwargs):
424         return self.request('put', path, **kwargs)
425
426     def copy(self, path, **kwargs):
427         return self.request('copy', path, **kwargs)
428
429     def move(self, path, **kwargs):
430         return self.request('move', path, **kwargs)