Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 24ff0a35

History | View | Annotate | Download (9.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import Thread
35
from json import dumps, loads
36
from time import time
37
import logging
38
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39
from kamaki.clients.connection.errors import HTTPConnectionError
40
from kamaki.clients.connection.errors import HTTPResponseError
41

    
42
sendlog = logging.getLogger('clients.send')
43
datasendlog = logging.getLogger('data.send')
44
recvlog = logging.getLogger('clients.recv')
45
datarecvlog = logging.getLogger('data.recv')
46

    
47

    
48
class ClientError(Exception):
49
    def __init__(self, message, status=0, details=None):
50
        try:
51
            message += '' if message and message[-1] == '\n' else '\n'
52
            serv_stat, sep, new_msg = message.partition('{')
53
            new_msg = sep + new_msg
54
            json_msg = loads(new_msg)
55
            key = json_msg.keys()[0]
56

    
57
            json_msg = json_msg[key]
58
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
60
            if 'code' in json_msg:
61
                status = json_msg['code']
62
            if 'details' in json_msg:
63
                if not details:
64
                    details = []
65
                elif not isinstance(details, list):
66
                    details = [details]
67
                if json_msg['details']:
68
                    details.append(json_msg['details'])
69
        except:
70
            pass
71

    
72
        super(ClientError, self).__init__(message)
73
        self.status = status
74
        self.details = details if details else []
75

    
76

    
77
class SilentEvent(Thread):
78
    """ Thread-run method(*args, **kwargs)
79
        put exception in exception_bucket
80
    """
81
    def __init__(self, method, *args, **kwargs):
82
        super(self.__class__, self).__init__()
83
        self.method = method
84
        self.args = args
85
        self.kwargs = kwargs
86

    
87
    @property
88
    def exception(self):
89
        return getattr(self, '_exception', False)
90

    
91
    @property
92
    def value(self):
93
        return getattr(self, '_value', None)
94

    
95
    def run(self):
96
        try:
97
            self._value = self.method(*(self.args), **(self.kwargs))
98
        except Exception as e:
99
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
100
                self,
101
                type(e),
102
                e.status if isinstance(e, ClientError) else '',
103
                e))
104
            self._exception = e
105

    
106

    
107
class Client(object):
108
    POOL_SIZE = 7
109

    
110
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111
        self.base_url = base_url
112
        self.token = token
113
        self.headers = {}
114
        self.DATE_FORMATS = [
115
            '%a %b %d %H:%M:%S %Y',
116
            '%A, %d-%b-%y %H:%M:%S GMT',
117
            '%a, %d %b %Y %H:%M:%S GMT']
118
        self.http_client = http_client
119

    
120
    def _init_thread_limit(self, limit=1):
121
        self._thread_limit = limit
122
        self._elapsed_old = 0.0
123
        self._elapsed_new = 0.0
124

    
125
    def _watch_thread_limit(self, threadlist):
126
        recvlog.debug('# running threads: %s' % len(threadlist))
127
        if (self._elapsed_old > self._elapsed_new) and (
128
            self._thread_limit < self.POOL_SIZE):
129
                self._thread_limit += 1
130
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
131
            self._thread_limit -= 1
132

    
133
        self._elapsed_old = self._elapsed_new
134
        if len(threadlist) >= self._thread_limit:
135
            self._elapsed_new = 0.0
136
            for thread in threadlist:
137
                begin_time = time()
138
                thread.join()
139
                self._elapsed_new += time() - begin_time
140
            self._elapsed_new = self._elapsed_new / len(threadlist)
141
            return []
142
        return threadlist
143

    
144
    def _raise_for_status(self, r):
145
        status_msg = getattr(r, 'status', '')
146
        try:
147
            message = '%s %s\n' % (status_msg, r.text)
148
        except:
149
            message = '%s %s\n' % (status_msg, r)
150
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
151
        raise ClientError(message, status=status)
152

    
153
    def set_header(self, name, value, iff=True):
154
        """Set a header 'name':'value'"""
155
        if value is not None and iff:
156
            self.http_client.set_header(name, value)
157

    
158
    def set_param(self, name, value=None, iff=True):
159
        if iff:
160
            self.http_client.set_param(name, value)
161

    
162
    def set_default_header(self, name, value):
163
        self.http_client.headers.setdefault(name, value)
164

    
165
    def request(
166
            self,
167
            method,
168
            path,
169
            async_headers={},
170
            async_params={},
171
            **kwargs):
172
        """In threaded/asynchronous requests, headers and params are not safe
173
        Therefore, the standard self.set_header/param system can be used only
174
        for headers and params that are common for all requests. All other
175
        params and headers should passes as
176
        @param async_headers
177
        @async_params
178
        E.g. in most queries the 'X-Auth-Token' header might be the same for
179
        all, but the 'Range' header might be different from request to request.
180
        """
181
        try:
182
            success = kwargs.pop('success', 200)
183

    
184
            data = kwargs.pop('data', None)
185
            self.set_default_header('X-Auth-Token', self.token)
186

    
187
            if 'json' in kwargs:
188
                data = dumps(kwargs.pop('json'))
189
                self.set_default_header('Content-Type', 'application/json')
190
            if data:
191
                self.set_default_header('Content-Length', unicode(len(data)))
192

    
193
            sendlog.info('perform a %s @ %s', method, self.base_url)
194

    
195
            self.http_client.url = self.base_url
196
            self.http_client.path = path
197
            r = self.http_client.perform_request(
198
                method,
199
                data,
200
                async_headers,
201
                async_params)
202

    
203
            req = self.http_client
204
            sendlog.info('%s %s', method, req.url)
205
            headers = dict(req.headers)
206
            headers.update(async_headers)
207

    
208
            for key, val in headers.items():
209
                sendlog.info('\t%s: %s', key, val)
210
            sendlog.info('')
211
            if data:
212
                datasendlog.info(data)
213

    
214
            recvlog.info('%d %s', r.status_code, r.status)
215
            for key, val in r.headers.items():
216
                recvlog.info('%s: %s', key, val)
217
            if r.content:
218
                datarecvlog.info(r.content)
219

    
220
        except (HTTPResponseError, HTTPConnectionError) as err:
221
            from traceback import format_stack
222
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
223
            self.http_client.reset_headers()
224
            self.http_client.reset_params()
225
            errstr = '%s' % err
226
            if not errstr:
227
                errstr = ('%s' % type(err))[7:-2]
228
            status = getattr(err, 'status', getattr(err, 'errno', 0))
229
            raise ClientError('%s\n' % errstr, status=status)
230

    
231
        self.http_client.reset_headers()
232
        self.http_client.reset_params()
233

    
234
        if success is not None:
235
            # Success can either be an in or a collection
236
            success = (success,) if isinstance(success, int) else success
237
            if r.status_code not in success:
238
                r.release()
239
                self._raise_for_status(r)
240
        return r
241

    
242
    def delete(self, path, **kwargs):
243
        return self.request('delete', path, **kwargs)
244

    
245
    def get(self, path, **kwargs):
246
        return self.request('get', path, **kwargs)
247

    
248
    def head(self, path, **kwargs):
249
        return self.request('head', path, **kwargs)
250

    
251
    def post(self, path, **kwargs):
252
        return self.request('post', path, **kwargs)
253

    
254
    def put(self, path, **kwargs):
255
        return self.request('put', path, **kwargs)
256

    
257
    def copy(self, path, **kwargs):
258
        return self.request('copy', path, **kwargs)
259

    
260
    def move(self, path, **kwargs):
261
        return self.request('move', path, **kwargs)