Handle url proccessing even if url is unicode
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
42 from logging import getLogger
43
44 from objpool.http import PooledHTTPConnection
45
46
47 TIMEOUT = 60.0   # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
53
54
55 def _encode(v):
56     if v and isinstance(v, unicode):
57         return quote(v.encode('utf-8'))
58     return v
59
60
61 class ClientError(Exception):
62     def __init__(self, message, status=0, details=None):
63         log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64             message,
65             status,
66             details))
67         try:
68             message += '' if message and message[-1] == '\n' else '\n'
69             serv_stat, sep, new_msg = message.partition('{')
70             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71             json_msg = loads(new_msg)
72             key = json_msg.keys()[0]
73             serv_stat = serv_stat.strip()
74
75             json_msg = json_msg[key]
76             message = '%s %s (%s)\n' % (
77                 serv_stat,
78                 key,
79                 json_msg['message']) if (
80                     'message' in json_msg) else '%s %s' % (serv_stat, key)
81             status = json_msg.get('code', status)
82             if 'details' in json_msg:
83                 if not details:
84                     details = []
85                 if not isinstance(details, list):
86                     details = [details]
87                 if json_msg['details']:
88                     details.append(json_msg['details'])
89         except Exception:
90             pass
91         finally:
92             while message.endswith('\n\n'):
93                 message = message[:-1]
94             super(ClientError, self).__init__(message)
95             self.status = status if isinstance(status, int) else 0
96             self.details = details if details else []
97
98
99 class Logged(object):
100
101     LOG_TOKEN = False
102     LOG_DATA = False
103
104
105 class RequestManager(Logged):
106     """Handle http request information"""
107
108     def _connection_info(self, url, path, params={}):
109         """ Set self.url to scheme://netloc/?params
110         :param url: (str or unicode) The service url
111
112         :param path: (str or unicode) The service path (url/path)
113
114         :param params: (dict) Parameters to add to final url
115
116         :returns: (scheme, netloc)
117         """
118         url = _encode(str(url)) if url else 'http://127.0.0.1/'
119         url += '' if url.endswith('/') else '/'
120         if path:
121             url += _encode(path[1:] if path.startswith('/') else path)
122         delim = '?'
123         for key, val in params.items():
124             val = _encode(val)
125             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126             delim = '&'
127         parsed = urlparse(url)
128         self.url = url
129         self.path = parsed.path or '/'
130         if parsed.query:
131             self.path += '?%s' % parsed.query
132         return (parsed.scheme, parsed.netloc)
133
134     def __init__(
135             self, method, url, path,
136             data=None, headers={}, params={}):
137         method = method.upper()
138         assert method in HTTP_METHODS, 'Invalid http method %s' % method
139         if headers:
140             assert isinstance(headers, dict)
141         self.headers = dict(headers)
142         self.method, self.data = method, data
143         self.scheme, self.netloc = self._connection_info(url, path, params)
144
145     def dump_log(self):
146         sendlog.info('%s %s://%s%s\t[%s]' % (
147             self.method,
148             self.scheme,
149             self.netloc,
150             self.path,
151             self))
152         for key, val in self.headers.items():
153             if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
154                 continue
155             sendlog.info('  %s: %s\t[%s]' % (key, val, self))
156         if self.data:
157             sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
158             if self.LOG_DATA:
159                 sendlog.info(self.data)
160         else:
161             sendlog.info('data size:0\t[%s]' % self)
162         sendlog.info('')
163
164     def perform(self, conn):
165         """
166         :param conn: (httplib connection object)
167
168         :returns: (HTTPResponse)
169         """
170         conn.request(
171             method=str(self.method.upper()),
172             url=str(self.path),
173             headers=self.headers,
174             body=self.data)
175         self.dump_log()
176         keep_trying = TIMEOUT
177         while keep_trying > 0:
178             try:
179                 return conn.getresponse()
180             except ResponseNotReady:
181                 wait = 0.03 * random()
182                 sleep(wait)
183                 keep_trying -= wait
184         logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
185         recvlog.debug(logmsg)
186         raise ClientError('HTTPResponse takes too long - kamaki timeout')
187
188
189 class ResponseManager(Logged):
190     """Manage the http request and handle the response data, headers, etc."""
191
192     def __init__(self, request, poolsize=None):
193         """
194         :param request: (RequestManager)
195         """
196         self.request = request
197         self._request_performed = False
198         self.poolsize = poolsize
199
200     def _get_response(self):
201         if self._request_performed:
202             return
203
204         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
205         try:
206             with PooledHTTPConnection(
207                     self.request.netloc, self.request.scheme,
208                     **pool_kw) as connection:
209                 self.request.LOG_TOKEN = self.LOG_TOKEN
210                 self.request.LOG_DATA = self.LOG_DATA
211                 r = self.request.perform(connection)
212                 recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
213                     self, r, self.request))
214                 self._request_performed = True
215                 self._status_code, self._status = r.status, unquote(r.reason)
216                 recvlog.info(
217                     '%d %s\t[p: %s]' % (self.status_code, self.status, self))
218                 self._headers = dict()
219                 for k, v in r.getheaders():
220                     if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
221                         continue
222                     v = unquote(v)
223                     self._headers[k] = v
224                     recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
225                 self._content = r.read()
226                 recvlog.info('data size: %s\t[p: %s]' % (
227                     len(self._content) if self._content else 0,
228                     self))
229                 if self.LOG_DATA and self._content:
230                     recvlog.info('%s\t[p: %s]' % (self._content, self))
231         except Exception as err:
232             from traceback import format_stack
233             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
234             raise ClientError(
235                 'Failed while http-connecting to %s (%s)' % (
236                     self.request.url,
237                     err))
238
239     @property
240     def status_code(self):
241         self._get_response()
242         return self._status_code
243
244     @property
245     def status(self):
246         self._get_response()
247         return self._status
248
249     @property
250     def headers(self):
251         self._get_response()
252         return self._headers
253
254     @property
255     def content(self):
256         self._get_response()
257         return self._content
258
259     @property
260     def text(self):
261         """
262         :returns: (str) content
263         """
264         self._get_response()
265         return '%s' % self._content
266
267     @property
268     def json(self):
269         """
270         :returns: (dict) squeezed from json-formated content
271         """
272         self._get_response()
273         try:
274             return loads(self._content)
275         except ValueError as err:
276             raise ClientError('Response not formated in JSON - %s' % err)
277
278
279 class SilentEvent(Thread):
280     """Thread-run method(*args, **kwargs)"""
281     def __init__(self, method, *args, **kwargs):
282         super(self.__class__, self).__init__()
283         self.method = method
284         self.args = args
285         self.kwargs = kwargs
286
287     @property
288     def exception(self):
289         return getattr(self, '_exception', False)
290
291     @property
292     def value(self):
293         return getattr(self, '_value', None)
294
295     def run(self):
296         try:
297             self._value = self.method(*(self.args), **(self.kwargs))
298         except Exception as e:
299             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
300                 self,
301                 type(e),
302                 e.status if isinstance(e, ClientError) else '',
303                 e))
304             self._exception = e
305
306
307 class Client(object):
308
309     MAX_THREADS = 7
310     DATE_FORMATS = [
311         '%a %b %d %H:%M:%S %Y',
312         '%A, %d-%b-%y %H:%M:%S GMT',
313         '%a, %d %b %Y %H:%M:%S GMT']
314     LOG_TOKEN = False
315     LOG_DATA = False
316
317     def __init__(self, base_url, token):
318         assert base_url, 'No base_url for client %s' % self
319         self.base_url = base_url
320         self.token = token
321         self.headers, self.params = dict(), dict()
322
323     def _init_thread_limit(self, limit=1):
324         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
325         self._thread_limit = limit
326         self._elapsed_old = 0.0
327         self._elapsed_new = 0.0
328
329     def _watch_thread_limit(self, threadlist):
330         self._thread_limit = getattr(self, '_thread_limit', 1)
331         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
332         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
333         recvlog.debug('# running threads: %s' % len(threadlist))
334         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
335                 self._thread_limit < self.MAX_THREADS):
336             self._thread_limit += 1
337         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
338             self._thread_limit -= 1
339
340         self._elapsed_old = self._elapsed_new
341         if len(threadlist) >= self._thread_limit:
342             self._elapsed_new = 0.0
343             for thread in threadlist:
344                 begin_time = time()
345                 thread.join()
346                 self._elapsed_new += time() - begin_time
347             self._elapsed_new = self._elapsed_new / len(threadlist)
348             return []
349         return threadlist
350
351     def _raise_for_status(self, r):
352         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
353         status_msg = getattr(r, 'status', None) or ''
354         try:
355             message = '%s %s\n' % (status_msg, r.text)
356         except:
357             message = '%s %s\n' % (status_msg, r)
358         status = getattr(r, 'status_code', getattr(r, 'status', 0))
359         raise ClientError(message, status=status)
360
361     def set_header(self, name, value, iff=True):
362         """Set a header 'name':'value'"""
363         if value is not None and iff:
364             self.headers[name] = value
365
366     def set_param(self, name, value=None, iff=True):
367         if iff:
368             self.params[name] = value
369
370     def request(
371             self, method, path,
372             async_headers=dict(), async_params=dict(),
373             **kwargs):
374         """Commit an HTTP request to base_url/path
375         Requests are commited to and performed by Request/ResponseManager
376         These classes perform a lazy http request. Present method, by default,
377         enforces them to perform the http call. Hint: call present method with
378         success=None to get a non-performed ResponseManager object.
379         """
380         assert isinstance(method, str) or isinstance(method, unicode)
381         assert method
382         assert isinstance(path, str) or isinstance(path, unicode)
383         try:
384             headers = dict(self.headers)
385             headers.update(async_headers)
386             params = dict(self.params)
387             params.update(async_params)
388             success = kwargs.pop('success', 200)
389             data = kwargs.pop('data', None)
390             headers.setdefault('X-Auth-Token', self.token)
391             if 'json' in kwargs:
392                 data = dumps(kwargs.pop('json'))
393                 headers.setdefault('Content-Type', 'application/json')
394             if data:
395                 headers.setdefault('Content-Length', '%s' % len(data))
396
397             sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
398             req = RequestManager(
399                 method, self.base_url, path,
400                 data=data, headers=headers, params=params)
401             #  req.log()
402             r = ResponseManager(req)
403             r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
404         finally:
405             self.headers = dict()
406             self.params = dict()
407
408         if success is not None:
409             # Success can either be an int or a collection
410             success = (success,) if isinstance(success, int) else success
411             if r.status_code not in success:
412                 self._raise_for_status(r)
413         return r
414
415     def delete(self, path, **kwargs):
416         return self.request('delete', path, **kwargs)
417
418     def get(self, path, **kwargs):
419         return self.request('get', path, **kwargs)
420
421     def head(self, path, **kwargs):
422         return self.request('head', path, **kwargs)
423
424     def post(self, path, **kwargs):
425         return self.request('post', path, **kwargs)
426
427     def put(self, path, **kwargs):
428         return self.request('put', path, **kwargs)
429
430     def copy(self, path, **kwargs):
431         return self.request('copy', path, **kwargs)
432
433     def move(self, path, **kwargs):
434         return self.request('move', path, **kwargs)