Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ db77d79e

History | View | Annotate | Download (9.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import Thread
35
from json import dumps, loads
36
from time import time
37
import logging
38
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39
from kamaki.clients.connection.errors import HTTPConnectionError
40
from kamaki.clients.connection.errors import HTTPResponseError
41

    
42
sendlog = logging.getLogger('clients.send')
43
datasendlog = logging.getLogger('data.send')
44
recvlog = logging.getLogger('clients.recv')
45
datarecvlog = logging.getLogger('data.recv')
46

    
47

    
48
class ClientError(Exception):
49
    def __init__(self, message, status=0, details=None):
50
        try:
51
            message += '' if message and message[-1] == '\n' else '\n'
52
            serv_stat, sep, new_msg = message.partition('{')
53
            new_msg = sep + new_msg
54
            json_msg = loads(new_msg)
55
            key = json_msg.keys()[0]
56

    
57
            json_msg = json_msg[key]
58
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
60
            if 'code' in json_msg:
61
                status = json_msg['code']
62
            if 'details' in json_msg:
63
                if not details:
64
                    details = []
65
                elif not isinstance(details, list):
66
                    details = [details]
67
                if json_msg['details']:
68
                    details.append(json_msg['details'])
69
        except:
70
            pass
71

    
72
        super(ClientError, self).__init__(message)
73
        self.status = status
74
        self.details = details if details else []
75

    
76

    
77
class SilentEvent(Thread):
78
    """ Thread-run method(*args, **kwargs)
79
        put exception in exception_bucket
80
    """
81
    def __init__(self, method, *args, **kwargs):
82
        super(self.__class__, self).__init__()
83
        self.method = method
84
        self.args = args
85
        self.kwargs = kwargs
86

    
87
    @property
88
    def exception(self):
89
        return getattr(self, '_exception', False)
90

    
91
    @property
92
    def value(self):
93
        return getattr(self, '_value', None)
94

    
95
    def run(self):
96
        try:
97
            self._value = self.method(*(self.args), **(self.kwargs))
98
        except Exception as e:
99
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
100
                self,
101
                type(e),
102
                e.status if isinstance(e, ClientError) else '',
103
                e))
104
            self._exception = e
105

    
106

    
107
class Client(object):
108
    POOL_SIZE = 7
109

    
110
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111
        self.base_url = base_url
112
        self.token = token
113
        self.headers = {}
114
        self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
115
            "%A, %d-%b-%y %H:%M:%S GMT",
116
            "%a, %d %b %Y %H:%M:%S GMT"]
117
        self.http_client = http_client
118

    
119
    def _init_thread_limit(self, limit=1):
120
        self._thread_limit = limit
121
        self._elapsed_old = 0.0
122
        self._elapsed_new = 0.0
123

    
124
    def _watch_thread_limit(self, threadlist):
125
        recvlog.debug('# running threads: %s' % len(threadlist))
126
        if self._elapsed_old > self._elapsed_new\
127
        and self._thread_limit < self.POOL_SIZE:
128
            self._thread_limit += 1
129
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
130
            self._thread_limit -= 1
131

    
132
        self._elapsed_old = self._elapsed_new
133
        if len(threadlist) >= self._thread_limit:
134
            self._elapsed_new = 0.0
135
            for thread in threadlist:
136
                begin_time = time()
137
                thread.join()
138
                self._elapsed_new += time() - begin_time
139
            self._elapsed_new = self._elapsed_new / len(threadlist)
140
            return []
141
        return threadlist
142

    
143
    def _raise_for_status(self, r):
144
        status_msg = getattr(r, 'status', '')
145
        try:
146
            message = '%s %s\n' % (status_msg, r.text)
147
        except:
148
            message = '%s %s\n' % (status_msg, r)
149
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
150
        raise ClientError(message, status=status)
151

    
152
    def set_header(self, name, value, iff=True):
153
        """Set a header 'name':'value'"""
154
        if value is not None and iff:
155
            self.http_client.set_header(name, value)
156

    
157
    def set_param(self, name, value=None, iff=True):
158
        if iff:
159
            self.http_client.set_param(name, value)
160

    
161
    def set_default_header(self, name, value):
162
        self.http_client.headers.setdefault(name, value)
163

    
164
    def request(self,
165
        method,
166
        path,
167
        async_headers={},
168
        async_params={},
169
        **kwargs):
170
        """In threaded/asynchronous requests, headers and params are not safe
171
        Therefore, the standard self.set_header/param system can be used only
172
        for headers and params that are common for all requests. All other
173
        params and headers should passes as
174
        @param async_headers
175
        @async_params
176
        E.g. in most queries the 'X-Auth-Token' header might be the same for
177
        all, but the 'Range' header might be different from request to request.
178
        """
179

    
180
        try:
181
            success = kwargs.pop('success', 200)
182

    
183
            data = kwargs.pop('data', None)
184
            self.set_default_header('X-Auth-Token', self.token)
185

    
186
            if 'json' in kwargs:
187
                data = dumps(kwargs.pop('json'))
188
                self.set_default_header('Content-Type', 'application/json')
189
            if data:
190
                self.set_default_header('Content-Length', unicode(len(data)))
191

    
192
            sendlog.info('perform a %s @ %s', method, self.base_url)
193

    
194
            self.http_client.url = self.base_url
195
            self.http_client.path = path
196
            r = self.http_client.perform_request(method,
197
                data,
198
                async_headers,
199
                async_params)
200

    
201
            req = self.http_client
202
            sendlog.info('%s %s', method, req.url)
203
            headers = dict(req.headers)
204
            headers.update(async_headers)
205

    
206
            for key, val in headers.items():
207
                sendlog.info('\t%s: %s', key, val)
208
            sendlog.info('')
209
            if data:
210
                datasendlog.info(data)
211

    
212
            recvlog.info('%d %s', r.status_code, r.status)
213
            for key, val in r.headers.items():
214
                recvlog.info('%s: %s', key, val)
215
            if r.content:
216
                datarecvlog.info(r.content)
217

    
218
        except (HTTPResponseError, HTTPConnectionError) as err:
219
            from traceback import format_stack
220
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
221
            self.http_client.reset_headers()
222
            self.http_client.reset_params()
223
            errstr = '%s' % err
224
            if not errstr:
225
                errstr = ('%s' % type(err))[7:-2]
226
            raise ClientError('%s\n' % errstr,
227
                status=getattr(err, 'status', 0) or getattr(err, 'errno', 0))
228

    
229
        self.http_client.reset_headers()
230
        self.http_client.reset_params()
231

    
232
        if success is not None:
233
            # Success can either be an in or a collection
234
            success = (success,) if isinstance(success, int) else success
235
            if r.status_code not in success:
236
                r.release()
237
                self._raise_for_status(r)
238
        return r
239

    
240
    def delete(self, path, **kwargs):
241
        return self.request('delete', path, **kwargs)
242

    
243
    def get(self, path, **kwargs):
244
        return self.request('get', path, **kwargs)
245

    
246
    def head(self, path, **kwargs):
247
        return self.request('head', path, **kwargs)
248

    
249
    def post(self, path, **kwargs):
250
        return self.request('post', path, **kwargs)
251

    
252
    def put(self, path, **kwargs):
253
        return self.request('put', path, **kwargs)
254

    
255
    def copy(self, path, **kwargs):
256
        return self.request('copy', path, **kwargs)
257

    
258
    def move(self, path, **kwargs):
259
        return self.request('move', path, **kwargs)