db55d81120be40a301c3738865270d66a56114b2
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
37 from time import time
38 import logging
39 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 from kamaki.clients.connection.errors import HTTPConnectionError
41 from kamaki.clients.connection.errors import HTTPResponseError
42
43 sendlog = logging.getLogger('clients.send')
44 datasendlog = logging.getLogger('data.send')
45 recvlog = logging.getLogger('clients.recv')
46 datarecvlog = logging.getLogger('data.recv')
47
48
49 class ClientError(Exception):
50     def __init__(self, message, status=0, details=None):
51         try:
52             message += '' if message and message[-1] == '\n' else '\n'
53             serv_stat, sep, new_msg = message.partition('{')
54             new_msg = sep + new_msg
55             json_msg = loads(new_msg)
56             key = json_msg.keys()[0]
57
58             json_msg = json_msg[key]
59             message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60                 if 'message' in json_msg else '%s %s' % (serv_stat, key)
61             if 'code' in json_msg:
62                 status = json_msg['code']
63             if 'details' in json_msg:
64                 if not details:
65                     details = []
66                 elif not isinstance(details, list):
67                     details = [details]
68                 if json_msg['details']:
69                     details.append(json_msg['details'])
70         except:
71             pass
72
73         super(ClientError, self).__init__(message)
74         self.status = status
75         self.details = details if details else []
76
77
78 class SilentEvent(Thread):
79     """ Thread-run method(*args, **kwargs)
80         put exception in exception_bucket
81     """
82     def __init__(self, method, *args, **kwargs):
83         super(self.__class__, self).__init__()
84         self.method = method
85         self.args = args
86         self.kwargs = kwargs
87
88     @property
89     def exception(self):
90         return getattr(self, '_exception', False)
91
92     @property
93     def value(self):
94         return getattr(self, '_value', None)
95
96     def run(self):
97         try:
98             self._value = self.method(*(self.args), **(self.kwargs))
99         except Exception as e:
100             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
101                 self,
102                 type(e),
103                 e.status if isinstance(e, ClientError) else '',
104                 e))
105             self._exception = e
106
107
108 class Client(object):
109     POOL_SIZE = 7
110
111     def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112         self.base_url = base_url
113         self.token = token
114         self.headers = {}
115         self.DATE_FORMATS = [
116             '%a %b %d %H:%M:%S %Y',
117             '%A, %d-%b-%y %H:%M:%S GMT',
118             '%a, %d %b %Y %H:%M:%S GMT']
119         self.http_client = http_client
120
121     def _init_thread_limit(self, limit=1):
122         self._thread_limit = limit
123         self._elapsed_old = 0.0
124         self._elapsed_new = 0.0
125
126     def _watch_thread_limit(self, threadlist):
127         recvlog.debug('# running threads: %s' % len(threadlist))
128         if (self._elapsed_old > self._elapsed_new) and (
129                 self._thread_limit < self.POOL_SIZE):
130             self._thread_limit += 1
131         elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132             self._thread_limit -= 1
133
134         self._elapsed_old = self._elapsed_new
135         if len(threadlist) >= self._thread_limit:
136             self._elapsed_new = 0.0
137             for thread in threadlist:
138                 begin_time = time()
139                 thread.join()
140                 self._elapsed_new += time() - begin_time
141             self._elapsed_new = self._elapsed_new / len(threadlist)
142             return []
143         return threadlist
144
145     def _raise_for_status(self, r):
146         status_msg = getattr(r, 'status', '')
147         try:
148             message = '%s %s\n' % (status_msg, r.text)
149         except:
150             message = '%s %s\n' % (status_msg, r)
151         status = getattr(r, 'status_code', getattr(r, 'status', 0))
152         raise ClientError(message, status=status)
153
154     def set_header(self, name, value, iff=True):
155         """Set a header 'name':'value'"""
156         if value is not None and iff:
157             self.http_client.set_header(name, value)
158
159     def set_param(self, name, value=None, iff=True):
160         if iff:
161             self.http_client.set_param(name, value)
162
163     def set_default_header(self, name, value):
164         self.http_client.headers.setdefault(name, value)
165
166     def request(
167             self,
168             method,
169             path,
170             async_headers={},
171             async_params={},
172             **kwargs):
173         """In threaded/asynchronous requests, headers and params are not safe
174         Therefore, the standard self.set_header/param system can be used only
175         for headers and params that are common for all requests. All other
176         params and headers should passes as
177         @param async_headers
178         @async_params
179         E.g. in most queries the 'X-Auth-Token' header might be the same for
180         all, but the 'Range' header might be different from request to request.
181         """
182         try:
183             success = kwargs.pop('success', 200)
184
185             data = kwargs.pop('data', None)
186             self.set_default_header('X-Auth-Token', self.token)
187
188             if 'json' in kwargs:
189                 data = dumps(kwargs.pop('json'))
190                 self.set_default_header('Content-Type', 'application/json')
191             if data:
192                 self.set_default_header('Content-Length', unicode(len(data)))
193
194             sendlog.info('perform a %s @ %s', method, self.base_url)
195
196             self.http_client.url = self.base_url
197             self.http_client.path = quote(path)
198             r = self.http_client.perform_request(
199                 method,
200                 data,
201                 async_headers,
202                 async_params)
203
204             req = self.http_client
205             sendlog.info('%s %s', method, req.url)
206             headers = dict(req.headers)
207             headers.update(async_headers)
208
209             for key, val in headers.items():
210                 sendlog.info('\t%s: %s', key, val)
211             sendlog.info('')
212             if data:
213                 datasendlog.info(data)
214
215             recvlog.info('%d %s', r.status_code, r.status)
216             for key, val in r.headers.items():
217                 recvlog.info('%s: %s', key, val)
218             if r.content:
219                 datarecvlog.info(r.content)
220
221         except (HTTPResponseError, HTTPConnectionError) as err:
222             from traceback import format_stack
223             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
224             self.http_client.reset_headers()
225             self.http_client.reset_params()
226             errstr = '%s' % err
227             if not errstr:
228                 errstr = ('%s' % type(err))[7:-2]
229             status = getattr(err, 'status', getattr(err, 'errno', 0))
230             raise ClientError('%s\n' % errstr, status=status)
231
232         self.http_client.reset_headers()
233         self.http_client.reset_params()
234
235         if success is not None:
236             # Success can either be an in or a collection
237             success = (success,) if isinstance(success, int) else success
238             if r.status_code not in success:
239                 r.release()
240                 self._raise_for_status(r)
241         return r
242
243     def delete(self, path, **kwargs):
244         return self.request('delete', path, **kwargs)
245
246     def get(self, path, **kwargs):
247         return self.request('get', path, **kwargs)
248
249     def head(self, path, **kwargs):
250         return self.request('head', path, **kwargs)
251
252     def post(self, path, **kwargs):
253         return self.request('post', path, **kwargs)
254
255     def put(self, path, **kwargs):
256         return self.request('put', path, **kwargs)
257
258     def copy(self, path, **kwargs):
259         return self.request('copy', path, **kwargs)
260
261     def move(self, path, **kwargs):
262         return self.request('move', path, **kwargs)