Fix pep8 compliance issues everywhere
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import Thread
35 from json import dumps, loads
36 from time import time
37 import logging
38 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39 from kamaki.clients.connection.errors import HTTPConnectionError
40 from kamaki.clients.connection.errors import HTTPResponseError
41
42 sendlog = logging.getLogger('clients.send')
43 datasendlog = logging.getLogger('data.send')
44 recvlog = logging.getLogger('clients.recv')
45 datarecvlog = logging.getLogger('data.recv')
46
47
48 class ClientError(Exception):
49     def __init__(self, message, status=0, details=None):
50         try:
51             message += '' if message and message[-1] == '\n' else '\n'
52             serv_stat, sep, new_msg = message.partition('{')
53             new_msg = sep + new_msg
54             json_msg = loads(new_msg)
55             key = json_msg.keys()[0]
56
57             json_msg = json_msg[key]
58             message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59                 if 'message' in json_msg else '%s %s' % (serv_stat, key)
60             if 'code' in json_msg:
61                 status = json_msg['code']
62             if 'details' in json_msg:
63                 if not details:
64                     details = []
65                 elif not isinstance(details, list):
66                     details = [details]
67                 if json_msg['details']:
68                     details.append(json_msg['details'])
69         except:
70             pass
71
72         super(ClientError, self).__init__(message)
73         self.status = status
74         self.details = details if details else []
75
76
77 class SilentEvent(Thread):
78     """ Thread-run method(*args, **kwargs)
79         put exception in exception_bucket
80     """
81     def __init__(self, method, *args, **kwargs):
82         super(self.__class__, self).__init__()
83         self.method = method
84         self.args = args
85         self.kwargs = kwargs
86
87     @property
88     def exception(self):
89         return getattr(self, '_exception', False)
90
91     @property
92     def value(self):
93         return getattr(self, '_value', None)
94
95     def run(self):
96         try:
97             self._value = self.method(*(self.args), **(self.kwargs))
98         except Exception as e:
99             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
100                 self,
101                 type(e),
102                 e.status if isinstance(e, ClientError) else '',
103                 e))
104             self._exception = e
105
106
107 class Client(object):
108     POOL_SIZE = 7
109
110     def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111         self.base_url = base_url
112         self.token = token
113         self.headers = {}
114         self.DATE_FORMATS = [
115             '%a %b %d %H:%M:%S %Y',
116             '%A, %d-%b-%y %H:%M:%S GMT',
117             '%a, %d %b %Y %H:%M:%S GMT']
118         self.http_client = http_client
119
120     def _init_thread_limit(self, limit=1):
121         self._thread_limit = limit
122         self._elapsed_old = 0.0
123         self._elapsed_new = 0.0
124
125     def _watch_thread_limit(self, threadlist):
126         recvlog.debug('# running threads: %s' % len(threadlist))
127         if (self._elapsed_old > self._elapsed_new) and (
128             self._thread_limit < self.POOL_SIZE):
129                 self._thread_limit += 1
130         elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
131             self._thread_limit -= 1
132
133         self._elapsed_old = self._elapsed_new
134         if len(threadlist) >= self._thread_limit:
135             self._elapsed_new = 0.0
136             for thread in threadlist:
137                 begin_time = time()
138                 thread.join()
139                 self._elapsed_new += time() - begin_time
140             self._elapsed_new = self._elapsed_new / len(threadlist)
141             return []
142         return threadlist
143
144     def _raise_for_status(self, r):
145         status_msg = getattr(r, 'status', '')
146         try:
147             message = '%s %s\n' % (status_msg, r.text)
148         except:
149             message = '%s %s\n' % (status_msg, r)
150         status = getattr(r, 'status_code', getattr(r, 'status', 0))
151         raise ClientError(message, status=status)
152
153     def set_header(self, name, value, iff=True):
154         """Set a header 'name':'value'"""
155         if value is not None and iff:
156             self.http_client.set_header(name, value)
157
158     def set_param(self, name, value=None, iff=True):
159         if iff:
160             self.http_client.set_param(name, value)
161
162     def set_default_header(self, name, value):
163         self.http_client.headers.setdefault(name, value)
164
165     def request(
166             self,
167             method,
168             path,
169             async_headers={},
170             async_params={},
171             **kwargs):
172         """In threaded/asynchronous requests, headers and params are not safe
173         Therefore, the standard self.set_header/param system can be used only
174         for headers and params that are common for all requests. All other
175         params and headers should passes as
176         @param async_headers
177         @async_params
178         E.g. in most queries the 'X-Auth-Token' header might be the same for
179         all, but the 'Range' header might be different from request to request.
180         """
181         try:
182             success = kwargs.pop('success', 200)
183
184             data = kwargs.pop('data', None)
185             self.set_default_header('X-Auth-Token', self.token)
186
187             if 'json' in kwargs:
188                 data = dumps(kwargs.pop('json'))
189                 self.set_default_header('Content-Type', 'application/json')
190             if data:
191                 self.set_default_header('Content-Length', unicode(len(data)))
192
193             sendlog.info('perform a %s @ %s', method, self.base_url)
194
195             self.http_client.url = self.base_url
196             self.http_client.path = path
197             r = self.http_client.perform_request(
198                 method,
199                 data,
200                 async_headers,
201                 async_params)
202
203             req = self.http_client
204             sendlog.info('%s %s', method, req.url)
205             headers = dict(req.headers)
206             headers.update(async_headers)
207
208             for key, val in headers.items():
209                 sendlog.info('\t%s: %s', key, val)
210             sendlog.info('')
211             if data:
212                 datasendlog.info(data)
213
214             recvlog.info('%d %s', r.status_code, r.status)
215             for key, val in r.headers.items():
216                 recvlog.info('%s: %s', key, val)
217             if r.content:
218                 datarecvlog.info(r.content)
219
220         except (HTTPResponseError, HTTPConnectionError) as err:
221             from traceback import format_stack
222             recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
223             self.http_client.reset_headers()
224             self.http_client.reset_params()
225             errstr = '%s' % err
226             if not errstr:
227                 errstr = ('%s' % type(err))[7:-2]
228             status = getattr(err, 'status', getattr(err, 'errno', 0))
229             raise ClientError('%s\n' % errstr, status=status)
230
231         self.http_client.reset_headers()
232         self.http_client.reset_params()
233
234         if success is not None:
235             # Success can either be an in or a collection
236             success = (success,) if isinstance(success, int) else success
237             if r.status_code not in success:
238                 r.release()
239                 self._raise_for_status(r)
240         return r
241
242     def delete(self, path, **kwargs):
243         return self.request('delete', path, **kwargs)
244
245     def get(self, path, **kwargs):
246         return self.request('get', path, **kwargs)
247
248     def head(self, path, **kwargs):
249         return self.request('head', path, **kwargs)
250
251     def post(self, path, **kwargs):
252         return self.request('post', path, **kwargs)
253
254     def put(self, path, **kwargs):
255         return self.request('put', path, **kwargs)
256
257     def copy(self, path, **kwargs):
258         return self.request('copy', path, **kwargs)
259
260     def move(self, path, **kwargs):
261         return self.request('move', path, **kwargs)