6d6e9c59d3d78e6132a8bf91887603af02fd0907
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
37 from time import time
38
39 from kamaki.clients.utils import get_logger, add_file_logger, get_log_filename
40 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
41 from kamaki.clients.connection.errors import KamakiConnectionError
42 from kamaki.clients.connection.errors import KamakiResponseError
43
44 LOG_TOKEN = False
45 DEBUG_LOG = get_log_filename()
46
47 add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
48 sendlog = get_logger('clients.send')
49 sendlog.debug('Logging location: %s' % DEBUG_LOG)
50 add_file_logger('data.send', __name__, filename=DEBUG_LOG)
51 datasendlog = get_logger('data.send')
52 add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
53 recvlog = get_logger('clients.recv')
54 add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
55 datarecvlog = get_logger('data.recv')
56
57
58 class ClientError(Exception):
59     def __init__(self, message, status=0, details=None):
60         try:
61             message += '' if message and message[-1] == '\n' else '\n'
62             serv_stat, sep, new_msg = message.partition('{')
63             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
64             json_msg = loads(new_msg)
65             key = json_msg.keys()[0]
66             serv_stat = serv_stat.strip()
67
68             json_msg = json_msg[key]
69             message = '%s %s (%s)\n' % (
70                 serv_stat,
71                 key,
72                 json_msg['message']) if (
73                     'message' in json_msg) else '%s %s' % (serv_stat, key)
74             status = json_msg.get('code', status)
75             if 'details' in json_msg:
76                 if not details:
77                     details = []
78                 if not isinstance(details, list):
79                     details = [details]
80                 if json_msg['details']:
81                     details.append(json_msg['details'])
82         except Exception:
83             pass
84         finally:
85             while message.endswith('\n\n'):
86                 message = message[:-1]
87             super(ClientError, self).__init__(message)
88             self.status = status if isinstance(status, int) else 0
89             self.details = details if details else []
90
91
92 class SilentEvent(Thread):
93     """ Thread-run method(*args, **kwargs)"""
94     def __init__(self, method, *args, **kwargs):
95         super(self.__class__, self).__init__()
96         self.method = method
97         self.args = args
98         self.kwargs = kwargs
99
100     @property
101     def exception(self):
102         return getattr(self, '_exception', False)
103
104     @property
105     def value(self):
106         return getattr(self, '_value', None)
107
108     def run(self):
109         try:
110             self._value = self.method(*(self.args), **(self.kwargs))
111         except Exception as e:
112             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
113                 self,
114                 type(e),
115                 e.status if isinstance(e, ClientError) else '',
116                 e))
117             self._exception = e
118
119
120 class Client(object):
121
122     def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
123         self.base_url = base_url
124         self.token = token
125         self.headers = {}
126         self.DATE_FORMATS = [
127             '%a %b %d %H:%M:%S %Y',
128             '%A, %d-%b-%y %H:%M:%S GMT',
129             '%a, %d %b %Y %H:%M:%S GMT']
130         self.http_client = http_client
131         self.MAX_THREADS = 7
132
133     def _init_thread_limit(self, limit=1):
134         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
135         self._thread_limit = limit
136         self._elapsed_old = 0.0
137         self._elapsed_new = 0.0
138
139     def _watch_thread_limit(self, threadlist):
140         self._thread_limit = getattr(self, '_thread_limit', 1)
141         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
142         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
143         recvlog.debug('# running threads: %s' % len(threadlist))
144         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
145                 self._thread_limit < self.MAX_THREADS):
146             self._thread_limit += 1
147         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
148             self._thread_limit -= 1
149
150         self._elapsed_old = self._elapsed_new
151         if len(threadlist) >= self._thread_limit:
152             self._elapsed_new = 0.0
153             for thread in threadlist:
154                 begin_time = time()
155                 thread.join()
156                 self._elapsed_new += time() - begin_time
157             self._elapsed_new = self._elapsed_new / len(threadlist)
158             return []
159         return threadlist
160
161     def _raise_for_status(self, r):
162         status_msg = getattr(r, 'status', None) or ''
163         try:
164             message = '%s %s\n' % (status_msg, r.text)
165         except:
166             message = '%s %s\n' % (status_msg, r)
167         status = getattr(r, 'status_code', getattr(r, 'status', 0))
168         raise ClientError(message, status=status)
169
170     def set_header(self, name, value, iff=True):
171         """Set a header 'name':'value'"""
172         if value is not None and iff:
173             self.http_client.set_header(name, value)
174
175     def set_param(self, name, value=None, iff=True):
176         if iff:
177             self.http_client.set_param(name, value)
178
179     def request(
180             self,
181             method,
182             path,
183             async_headers={},
184             async_params={},
185             **kwargs):
186         """In threaded/asynchronous requests, headers and params are not safe
187         Therefore, the standard self.set_header/param system can be used only
188         for headers and params that are common for all requests. All other
189         params and headers should passes as
190         @param async_headers
191         @async_params
192         E.g. in most queries the 'X-Auth-Token' header might be the same for
193         all, but the 'Range' header might be different from request to request.
194         """
195         assert isinstance(method, str) or isinstance(method, unicode)
196         assert method
197         assert isinstance(path, str) or isinstance(path, unicode)
198         try:
199             success = kwargs.pop('success', 200)
200             data = kwargs.pop('data', None)
201             self.http_client.headers.setdefault('X-Auth-Token', self.token)
202
203             if 'json' in kwargs:
204                 data = dumps(kwargs.pop('json'))
205                 self.http_client.headers.setdefault(
206                     'Content-Type',
207                     'application/json')
208             if data:
209                 self.http_client.headers.setdefault(
210                     'Content-Length',
211                     '%s' % len(data))
212
213             sendlog.info('perform a %s @ %s', method, self.base_url)
214
215             self.http_client.url = self.base_url
216             self.http_client.path = quote(path.encode('utf8'))
217             r = self.http_client.perform_request(
218                 method,
219                 data,
220                 async_headers,
221                 async_params)
222
223             req = self.http_client
224             sendlog.info('%s %s', method, req.url)
225             headers = dict(req.headers)
226             headers.update(async_headers)
227
228             for key, val in headers.items():
229                 if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
230                     continue
231                 sendlog.info('\t%s: %s', key, val)
232             sendlog.info('')
233             if data:
234                 datasendlog.info(data)
235
236             recvlog.info('%d %s', r.status_code, r.status)
237             for key, val in r.headers.items():
238                 if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
239                     continue
240                 recvlog.info('%s: %s', key, val)
241             if r.content:
242                 datarecvlog.info(r.content)
243
244         except (KamakiResponseError, KamakiConnectionError) as err:
245             from traceback import format_stack
246             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
247             self.http_client.reset_headers()
248             self.http_client.reset_params()
249             errstr = '%s' % err
250             if not errstr:
251                 errstr = ('%s' % type(err))[7:-2]
252             status = getattr(err, 'status', getattr(err, 'errno', 0))
253             raise ClientError('%s\n' % errstr, status=status)
254         finally:
255             self.http_client.reset_headers()
256             self.http_client.reset_params()
257
258         if success is not None:
259             # Success can either be an int or a collection
260             success = (success,) if isinstance(success, int) else success
261             if r.status_code not in success:
262                 r.release()
263                 self._raise_for_status(r)
264         return r
265
266     def delete(self, path, **kwargs):
267         return self.request('delete', path, **kwargs)
268
269     def get(self, path, **kwargs):
270         return self.request('get', path, **kwargs)
271
272     def head(self, path, **kwargs):
273         return self.request('head', path, **kwargs)
274
275     def post(self, path, **kwargs):
276         return self.request('post', path, **kwargs)
277
278     def put(self, path, **kwargs):
279         return self.request('put', path, **kwargs)
280
281     def copy(self, path, **kwargs):
282         return self.request('copy', path, **kwargs)
283
284     def move(self, path, **kwargs):
285         return self.request('move', path, **kwargs)