Fix typos in logging mechanism
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady
40 from time import sleep
41 from random import random
42
43 from objpool.http import PooledHTTPConnection
44
45 from kamaki.clients.utils import logger
46
47 DEBUG_LOG = logger.get_log_filename()
48 TIMEOUT = 60.0   # seconds
49 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50 LOG_TOKEN = False
51 LOG_DATA = True
52
53 logger.add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
54 sendlog = logger.get_logger('clients.send')
55 sendlog.debug('Logging location: %s' % DEBUG_LOG)
56
57 logger.add_file_logger('data.send', __name__, filename=DEBUG_LOG)
58 datasendlog = logger.get_logger('data.send')
59
60 logger.add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
61 recvlog = logger.get_logger('clients.recv')
62
63 logger.add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
64 datarecvlog = logger.get_logger('data.recv')
65
66 logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
67 clienterrorlog = logger.get_logger('ClientError')
68
69
70 def _encode(v):
71     if v and isinstance(v, unicode):
72         return quote(v.encode('utf-8'))
73     return v
74
75
76 class ClientError(Exception):
77     def __init__(self, message, status=0, details=None):
78         clienterrorlog.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
79             message,
80             status,
81             details))
82         try:
83             message += '' if message and message[-1] == '\n' else '\n'
84             serv_stat, sep, new_msg = message.partition('{')
85             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
86             json_msg = loads(new_msg)
87             key = json_msg.keys()[0]
88             serv_stat = serv_stat.strip()
89
90             json_msg = json_msg[key]
91             message = '%s %s (%s)\n' % (
92                 serv_stat,
93                 key,
94                 json_msg['message']) if (
95                     'message' in json_msg) else '%s %s' % (serv_stat, key)
96             status = json_msg.get('code', status)
97             if 'details' in json_msg:
98                 if not details:
99                     details = []
100                 if not isinstance(details, list):
101                     details = [details]
102                 if json_msg['details']:
103                     details.append(json_msg['details'])
104         except Exception:
105             pass
106         finally:
107             while message.endswith('\n\n'):
108                 message = message[:-1]
109             super(ClientError, self).__init__(message)
110             self.status = status if isinstance(status, int) else 0
111             self.details = details if details else []
112
113
114 class RequestManager(object):
115     """Handle http request information"""
116
117     def _connection_info(self, url, path, params={}):
118         """ Set self.url to scheme://netloc/?params
119         :param url: (str or unicode) The service url
120
121         :param path: (str or unicode) The service path (url/path)
122
123         :param params: (dict) Parameters to add to final url
124
125         :returns: (scheme, netloc)
126         """
127         url = _encode(url) if url else 'http://127.0.0.1/'
128         url += '' if url.endswith('/') else '/'
129         if path:
130             url += _encode(path[1:] if path.startswith('/') else path)
131         delim = '?'
132         for key, val in params.items():
133             val = _encode(val)
134             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
135             delim = '&'
136         parsed = urlparse(url)
137         self.url = url
138         self.path = parsed.path or '/'
139         if parsed.query:
140             self.path += '?%s' % parsed.query
141         return (parsed.scheme, parsed.netloc)
142
143     def __init__(
144             self, method, url, path,
145             data=None, headers={}, params={}):
146         method = method.upper()
147         assert method in HTTP_METHODS, 'Invalid http method %s' % method
148         if headers:
149             assert isinstance(headers, dict)
150         self.headers = dict(headers)
151         self.method, self.data = method, data
152         self.scheme, self.netloc = self._connection_info(url, path, params)
153
154     def log(self):
155         sendlog.debug('%s %s://%s%s\t[%s]' % (
156             self.method,
157             self.scheme,
158             self.netloc,
159             self.path,
160             self))
161         for key, val in self.headers.items():
162             if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
163                 continue
164             sendlog.debug('%s: %s\t[%s]', (key, val, self))
165         if self.data:
166             sendlog.debug('data size:%s\t[%s]' % (len(self.data), self))
167             if LOG_DATA:
168                 datasendlog.info(self.data)
169         else:
170             sendlog.debug('data size:0\t[%s]' % self)
171         sendlog.info('')
172
173     def perform(self, conn):
174         """
175         :param conn: (httplib connection object)
176
177         :returns: (HTTPResponse)
178         """
179         conn.request(
180             method=str(self.method.upper()),
181             url=str(self.path),
182             headers=self.headers,
183             body=self.data)
184         self.log()
185         keep_trying = 60.0
186         while keep_trying > 0:
187             try:
188                 return conn.getresponse()
189             except ResponseNotReady:
190                 wait = 0.03 * random()
191                 sleep(wait)
192                 keep_trying -= wait
193         logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
194         recvlog.debug(logmsg)
195         raise ClientError('HTTPResponse takes too long - kamaki timeout')
196
197
198 class ResponseManager(object):
199     """Manage the http request and handle the response data, headers, etc."""
200
201     def __init__(self, request, poolsize=None):
202         """
203         :param request: (RequestManager)
204         """
205         self.request = request
206         self._request_performed = False
207         self.poolsize = poolsize
208
209     def _get_response(self):
210         if self._request_performed:
211             return
212
213         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
214         try:
215             with PooledHTTPConnection(
216                     self.request.netloc, self.request.scheme,
217                     **pool_kw) as connection:
218                 r = self.request.perform(connection)
219                 recvlog.debug('[resp: %s] <-- [req: %s]\n' % (r, self.request))
220                 self._request_performed = True
221                 self._status_code, self._status = r.status, r.reason
222                 recvlog.debug(
223                     '%d %s\t[p: %s]' % (self.status_code, self.status, self))
224                 self._headers = dict()
225                 for k, v in r.getheaders():
226                     if (not LOG_TOKEN) and k.lower() == 'x-auth-token':
227                         continue
228                     self._headers[k] = v
229                     recvlog.debug('  %s: %s\t[p: %s]' % (k, v, self))
230                 self._content = r.read()
231                 recvlog.debug('data size: %s\t[p: %s]' % (
232                     len(self._content) if self._content else 0,
233                     self))
234                 if LOG_DATA and self._content:
235                     datarecvlog.debug('%s\t[p: %s]' % (self._content, self))
236         except Exception as err:
237             from traceback import format_stack
238             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
239             raise ClientError(
240                 'Failed while http-connecting to %s (%s)' % (
241                     self.request.url,
242                     err))
243
244     @property
245     def status_code(self):
246         self._get_response()
247         return self._status_code
248
249     @property
250     def status(self):
251         self._get_response()
252         return self._status
253
254     @property
255     def headers(self):
256         self._get_response()
257         return self._headers
258
259     @property
260     def content(self):
261         self._get_response()
262         return self._content
263
264     @property
265     def text(self):
266         """
267         :returns: (str) content
268         """
269         self._get_response()
270         return '%s' % self._content
271
272     @property
273     def json(self):
274         """
275         :returns: (dict) squeezed from json-formated content
276         """
277         self._get_response()
278         try:
279             return loads(self._content)
280         except ValueError as err:
281             raise ClientError('Response not formated in JSON - %s' % err)
282
283
284 class SilentEvent(Thread):
285     """Thread-run method(*args, **kwargs)"""
286     def __init__(self, method, *args, **kwargs):
287         super(self.__class__, self).__init__()
288         self.method = method
289         self.args = args
290         self.kwargs = kwargs
291
292     @property
293     def exception(self):
294         return getattr(self, '_exception', False)
295
296     @property
297     def value(self):
298         return getattr(self, '_value', None)
299
300     def run(self):
301         try:
302             self._value = self.method(*(self.args), **(self.kwargs))
303         except Exception as e:
304             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
305                 self,
306                 type(e),
307                 e.status if isinstance(e, ClientError) else '',
308                 e))
309             self._exception = e
310
311
312 class Client(object):
313
314     MAX_THREADS = 7
315     DATE_FORMATS = [
316         '%a %b %d %H:%M:%S %Y',
317         '%A, %d-%b-%y %H:%M:%S GMT',
318         '%a, %d %b %Y %H:%M:%S GMT']
319
320     def __init__(self, base_url, token):
321         self.base_url = base_url
322         self.token = token
323         self.headers, self.params = dict(), dict()
324
325     def _init_thread_limit(self, limit=1):
326         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
327         self._thread_limit = limit
328         self._elapsed_old = 0.0
329         self._elapsed_new = 0.0
330
331     def _watch_thread_limit(self, threadlist):
332         self._thread_limit = getattr(self, '_thread_limit', 1)
333         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
334         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
335         recvlog.debug('# running threads: %s' % len(threadlist))
336         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
337                 self._thread_limit < self.MAX_THREADS):
338             self._thread_limit += 1
339         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
340             self._thread_limit -= 1
341
342         self._elapsed_old = self._elapsed_new
343         if len(threadlist) >= self._thread_limit:
344             self._elapsed_new = 0.0
345             for thread in threadlist:
346                 begin_time = time()
347                 thread.join()
348                 self._elapsed_new += time() - begin_time
349             self._elapsed_new = self._elapsed_new / len(threadlist)
350             return []
351         return threadlist
352
353     def _raise_for_status(self, r):
354         clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
355         status_msg = getattr(r, 'status', None) or ''
356         try:
357             message = '%s %s\n' % (status_msg, r.text)
358         except:
359             message = '%s %s\n' % (status_msg, r)
360         status = getattr(r, 'status_code', getattr(r, 'status', 0))
361         raise ClientError(message, status=status)
362
363     def set_header(self, name, value, iff=True):
364         """Set a header 'name':'value'"""
365         if value is not None and iff:
366             self.headers[name] = value
367
368     def set_param(self, name, value=None, iff=True):
369         if iff:
370             self.params[name] = value
371
372     def request(
373             self, method, path,
374             async_headers=dict(), async_params=dict(),
375             **kwargs):
376         """Commit an HTTP request to base_url/path
377         Requests are commited to and performed by Request/ResponseManager
378         These classes perform a lazy http request. Present method, by default,
379         enforces them to perform the http call. Hint: call present method with
380         success=None to get a non-performed ResponseManager object.
381         """
382         assert isinstance(method, str) or isinstance(method, unicode)
383         assert method
384         assert isinstance(path, str) or isinstance(path, unicode)
385         try:
386             headers = dict(self.headers)
387             headers.update(async_headers)
388             params = dict(self.params)
389             params.update(async_params)
390             success = kwargs.pop('success', 200)
391             data = kwargs.pop('data', None)
392             headers.setdefault('X-Auth-Token', self.token)
393             if 'json' in kwargs:
394                 data = dumps(kwargs.pop('json'))
395                 headers.setdefault('Content-Type', 'application/json')
396             if data:
397                 headers.setdefault('Content-Length', '%s' % len(data))
398
399             sendlog.debug('COMMIT %s @ %s\t[%s]', method, self.base_url, self)
400             req = RequestManager(
401                 method, self.base_url, path,
402                 data=data, headers=headers, params=params)
403             #  req.log()
404             r = ResponseManager(req)
405         finally:
406             self.headers = dict()
407             self.params = dict()
408
409         if success is not None:
410             # Success can either be an int or a collection
411             success = (success,) if isinstance(success, int) else success
412             if r.status_code not in success:
413                 self._raise_for_status(r)
414         return r
415
416     def delete(self, path, **kwargs):
417         return self.request('delete', path, **kwargs)
418
419     def get(self, path, **kwargs):
420         return self.request('get', path, **kwargs)
421
422     def head(self, path, **kwargs):
423         return self.request('head', path, **kwargs)
424
425     def post(self, path, **kwargs):
426         return self.request('post', path, **kwargs)
427
428     def put(self, path, **kwargs):
429         return self.request('put', path, **kwargs)
430
431     def copy(self, path, **kwargs):
432         return self.request('copy', path, **kwargs)
433
434     def move(self, path, **kwargs):
435         return self.request('move', path, **kwargs)