Add / at the end of images path (plankton)
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
37 from time import time
38 import logging
39 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 from kamaki.clients.connection.errors import HTTPConnectionError
41 from kamaki.clients.connection.errors import HTTPResponseError
42
43 sendlog = logging.getLogger('clients.send')
44 datasendlog = logging.getLogger('data.send')
45 recvlog = logging.getLogger('clients.recv')
46 datarecvlog = logging.getLogger('data.recv')
47
48
49 class ClientError(Exception):
50     def __init__(self, message, status=0, details=None):
51         try:
52             message += '' if message and message[-1] == '\n' else '\n'
53             serv_stat, sep, new_msg = message.partition('{')
54             new_msg = sep + new_msg
55             json_msg = loads(new_msg)
56             key = json_msg.keys()[0]
57
58             json_msg = json_msg[key]
59             message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60                 if 'message' in json_msg else '%s %s' % (serv_stat, key)
61             if 'code' in json_msg:
62                 status = json_msg['code']
63             if 'details' in json_msg:
64                 if not details:
65                     details = []
66                 elif not isinstance(details, list):
67                     details = [details]
68                 if json_msg['details']:
69                     details.append(json_msg['details'])
70         except:
71             pass
72
73         super(ClientError, self).__init__(message)
74         self.status = status
75         self.details = details if details else []
76
77
78 class SilentEvent(Thread):
79     """ Thread-run method(*args, **kwargs)
80         put exception in exception_bucket
81     """
82     def __init__(self, method, *args, **kwargs):
83         super(self.__class__, self).__init__()
84         self.method = method
85         self.args = args
86         self.kwargs = kwargs
87
88     @property
89     def exception(self):
90         return getattr(self, '_exception', False)
91
92     @property
93     def value(self):
94         return getattr(self, '_value', None)
95
96     def run(self):
97         try:
98             self._value = self.method(*(self.args), **(self.kwargs))
99         except Exception as e:
100             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
101                 self,
102                 type(e),
103                 e.status if isinstance(e, ClientError) else '',
104                 e))
105             self._exception = e
106
107
108 class Client(object):
109     POOL_SIZE = 7
110
111     def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112         self.base_url = base_url
113         self.token = token
114         self.headers = {}
115         self.DATE_FORMATS = [
116             '%a %b %d %H:%M:%S %Y',
117             '%A, %d-%b-%y %H:%M:%S GMT',
118             '%a, %d %b %Y %H:%M:%S GMT']
119         self.http_client = http_client
120
121     def _init_thread_limit(self, limit=1):
122         self._thread_limit = limit
123         self._elapsed_old = 0.0
124         self._elapsed_new = 0.0
125
126     def _watch_thread_limit(self, threadlist):
127         recvlog.debug('# running threads: %s' % len(threadlist))
128         if (self._elapsed_old > self._elapsed_new) and (
129                 self._thread_limit < self.POOL_SIZE):
130             self._thread_limit += 1
131         elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132             self._thread_limit -= 1
133
134         self._elapsed_old = self._elapsed_new
135         if len(threadlist) >= self._thread_limit:
136             self._elapsed_new = 0.0
137             for thread in threadlist:
138                 begin_time = time()
139                 thread.join()
140                 self._elapsed_new += time() - begin_time
141             self._elapsed_new = self._elapsed_new / len(threadlist)
142             return []
143         return threadlist
144
145     def _raise_for_status(self, r):
146         status_msg = getattr(r, 'status', '')
147         try:
148             message = '%s %s\n' % (status_msg, r.text)
149         except:
150             message = '%s %s\n' % (status_msg, r)
151         status = getattr(r, 'status_code', getattr(r, 'status', 0))
152         raise ClientError(message, status=status)
153
154     def set_header(self, name, value, iff=True):
155         """Set a header 'name':'value'"""
156         if value is not None and iff:
157             self.http_client.set_header(name, value)
158
159     def set_param(self, name, value=None, iff=True):
160         if iff:
161             self.http_client.set_param(name, value)
162
163     def set_default_header(self, name, value):
164         self.http_client.headers.setdefault(name, value)
165
166     def request(
167             self,
168             method,
169             path,
170             async_headers={},
171             async_params={},
172             **kwargs):
173         """In threaded/asynchronous requests, headers and params are not safe
174         Therefore, the standard self.set_header/param system can be used only
175         for headers and params that are common for all requests. All other
176         params and headers should passes as
177         @param async_headers
178         @async_params
179         E.g. in most queries the 'X-Auth-Token' header might be the same for
180         all, but the 'Range' header might be different from request to request.
181         """
182         try:
183             success = kwargs.pop('success', 200)
184
185             data = kwargs.pop('data', None)
186             self.set_default_header('X-Auth-Token', self.token)
187
188             if 'json' in kwargs:
189                 data = dumps(kwargs.pop('json'))
190                 self.set_default_header('Content-Type', 'application/json')
191             if data:
192                 self.set_default_header('Content-Length', '%s' % len(data))
193
194             sendlog.info('perform a %s @ %s', method, self.base_url)
195
196             self.http_client.url = self.base_url + (
197                 '/' if (self.base_url and self.base_url[-1]) != '/' else '')
198             self.http_client.path = quote(path.encode('utf8'))
199             r = self.http_client.perform_request(
200                 method,
201                 data,
202                 async_headers,
203                 async_params)
204
205             req = self.http_client
206             sendlog.info('%s %s', method, req.url)
207             headers = dict(req.headers)
208             headers.update(async_headers)
209
210             for key, val in headers.items():
211                 sendlog.info('\t%s: %s', key, val)
212             sendlog.info('')
213             if data:
214                 datasendlog.info(data)
215
216             recvlog.info('%d %s', r.status_code, r.status)
217             for key, val in r.headers.items():
218                 recvlog.info('%s: %s', key, val)
219             if r.content:
220                 datarecvlog.info(r.content)
221
222         except (HTTPResponseError, HTTPConnectionError) as err:
223             from traceback import format_stack
224             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
225             self.http_client.reset_headers()
226             self.http_client.reset_params()
227             errstr = '%s' % err
228             if not errstr:
229                 errstr = ('%s' % type(err))[7:-2]
230             status = getattr(err, 'status', getattr(err, 'errno', 0))
231             raise ClientError('%s\n' % errstr, status=status)
232
233         self.http_client.reset_headers()
234         self.http_client.reset_params()
235
236         if success is not None:
237             # Success can either be an in or a collection
238             success = (success,) if isinstance(success, int) else success
239             if r.status_code not in success:
240                 r.release()
241                 self._raise_for_status(r)
242         return r
243
244     def delete(self, path, **kwargs):
245         return self.request('delete', path, **kwargs)
246
247     def get(self, path, **kwargs):
248         return self.request('get', path, **kwargs)
249
250     def head(self, path, **kwargs):
251         return self.request('head', path, **kwargs)
252
253     def post(self, path, **kwargs):
254         return self.request('post', path, **kwargs)
255
256     def put(self, path, **kwargs):
257         return self.request('put', path, **kwargs)
258
259     def copy(self, path, **kwargs):
260         return self.request('copy', path, **kwargs)
261
262     def move(self, path, **kwargs):
263         return self.request('move', path, **kwargs)