Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ cf215a96

History | View | Annotate | Download (9.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
import logging
39
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40
from kamaki.clients.connection.errors import HTTPConnectionError
41
from kamaki.clients.connection.errors import HTTPResponseError
42

    
43
sendlog = logging.getLogger('clients.send')
44
datasendlog = logging.getLogger('data.send')
45
recvlog = logging.getLogger('clients.recv')
46
datarecvlog = logging.getLogger('data.recv')
47

    
48

    
49
class ClientError(Exception):
50
    def __init__(self, message, status=0, details=None):
51
        try:
52
            message += '' if message and message[-1] == '\n' else '\n'
53
            serv_stat, sep, new_msg = message.partition('{')
54
            new_msg = sep + new_msg
55
            json_msg = loads(new_msg)
56
            key = json_msg.keys()[0]
57

    
58
            json_msg = json_msg[key]
59
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
61
            if 'code' in json_msg:
62
                status = json_msg['code']
63
            if 'details' in json_msg:
64
                if not details:
65
                    details = []
66
                elif not isinstance(details, list):
67
                    details = [details]
68
                if json_msg['details']:
69
                    details.append(json_msg['details'])
70
        except:
71
            pass
72

    
73
        super(ClientError, self).__init__(message)
74
        self.status = status
75
        self.details = details if details else []
76

    
77

    
78
class SilentEvent(Thread):
79
    """ Thread-run method(*args, **kwargs)
80
        put exception in exception_bucket
81
    """
82
    def __init__(self, method, *args, **kwargs):
83
        super(self.__class__, self).__init__()
84
        self.method = method
85
        self.args = args
86
        self.kwargs = kwargs
87

    
88
    @property
89
    def exception(self):
90
        return getattr(self, '_exception', False)
91

    
92
    @property
93
    def value(self):
94
        return getattr(self, '_value', None)
95

    
96
    def run(self):
97
        try:
98
            self._value = self.method(*(self.args), **(self.kwargs))
99
        except Exception as e:
100
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
101
                self,
102
                type(e),
103
                e.status if isinstance(e, ClientError) else '',
104
                e))
105
            self._exception = e
106

    
107

    
108
class Client(object):
109
    POOL_SIZE = 7
110

    
111
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112
        self.base_url = base_url
113
        self.token = token
114
        self.headers = {}
115
        self.DATE_FORMATS = [
116
            '%a %b %d %H:%M:%S %Y',
117
            '%A, %d-%b-%y %H:%M:%S GMT',
118
            '%a, %d %b %Y %H:%M:%S GMT']
119
        self.http_client = http_client
120

    
121
    def _init_thread_limit(self, limit=1):
122
        self._thread_limit = limit
123
        self._elapsed_old = 0.0
124
        self._elapsed_new = 0.0
125

    
126
    def _watch_thread_limit(self, threadlist):
127
        recvlog.debug('# running threads: %s' % len(threadlist))
128
        if (self._elapsed_old > self._elapsed_new) and (
129
                self._thread_limit < self.POOL_SIZE):
130
            self._thread_limit += 1
131
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132
            self._thread_limit -= 1
133

    
134
        self._elapsed_old = self._elapsed_new
135
        if len(threadlist) >= self._thread_limit:
136
            self._elapsed_new = 0.0
137
            for thread in threadlist:
138
                begin_time = time()
139
                thread.join()
140
                self._elapsed_new += time() - begin_time
141
            self._elapsed_new = self._elapsed_new / len(threadlist)
142
            return []
143
        return threadlist
144

    
145
    def _raise_for_status(self, r):
146
        status_msg = getattr(r, 'status', '')
147
        try:
148
            message = '%s %s\n' % (status_msg, r.text)
149
        except:
150
            message = '%s %s\n' % (status_msg, r)
151
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
152
        raise ClientError(message, status=status)
153

    
154
    def set_header(self, name, value, iff=True):
155
        """Set a header 'name':'value'"""
156
        if value is not None and iff:
157
            self.http_client.set_header(name, value)
158

    
159
    def set_param(self, name, value=None, iff=True):
160
        if iff:
161
            self.http_client.set_param(name, value)
162

    
163
    def set_default_header(self, name, value):
164
        self.http_client.headers.setdefault(name, value)
165

    
166
    def request(
167
            self,
168
            method,
169
            path,
170
            async_headers={},
171
            async_params={},
172
            **kwargs):
173
        """In threaded/asynchronous requests, headers and params are not safe
174
        Therefore, the standard self.set_header/param system can be used only
175
        for headers and params that are common for all requests. All other
176
        params and headers should passes as
177
        @param async_headers
178
        @async_params
179
        E.g. in most queries the 'X-Auth-Token' header might be the same for
180
        all, but the 'Range' header might be different from request to request.
181
        """
182
        try:
183
            success = kwargs.pop('success', 200)
184

    
185
            data = kwargs.pop('data', None)
186
            self.set_default_header('X-Auth-Token', self.token)
187

    
188
            if 'json' in kwargs:
189
                data = dumps(kwargs.pop('json'))
190
                self.set_default_header('Content-Type', 'application/json')
191
            if data:
192
                self.set_default_header('Content-Length', '%s' % len(data))
193

    
194
            sendlog.info('perform a %s @ %s', method, self.base_url)
195

    
196
            self.http_client.url = self.base_url
197
            self.http_client.path = quote(path.encode('utf8'))
198
            r = self.http_client.perform_request(
199
                method,
200
                data,
201
                async_headers,
202
                async_params)
203

    
204
            req = self.http_client
205
            sendlog.info('%s %s', method, req.url)
206
            headers = dict(req.headers)
207
            headers.update(async_headers)
208

    
209
            for key, val in headers.items():
210
                sendlog.info('\t%s: %s', key, val)
211
            sendlog.info('')
212
            if data:
213
                datasendlog.info(data)
214

    
215
            recvlog.info('%d %s', r.status_code, r.status)
216
            for key, val in r.headers.items():
217
                recvlog.info('%s: %s', key, val)
218
            if r.content:
219
                datarecvlog.info(r.content)
220

    
221
        except (HTTPResponseError, HTTPConnectionError) as err:
222
            from traceback import format_stack
223
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
224
            self.http_client.reset_headers()
225
            self.http_client.reset_params()
226
            errstr = '%s' % err
227
            if not errstr:
228
                errstr = ('%s' % type(err))[7:-2]
229
            status = getattr(err, 'status', getattr(err, 'errno', 0))
230
            raise ClientError('%s\n' % errstr, status=status)
231

    
232
        self.http_client.reset_headers()
233
        self.http_client.reset_params()
234

    
235
        if success is not None:
236
            # Success can either be an in or a collection
237
            success = (success,) if isinstance(success, int) else success
238
            if r.status_code not in success:
239
                r.release()
240
                self._raise_for_status(r)
241
        return r
242

    
243
    def delete(self, path, **kwargs):
244
        return self.request('delete', path, **kwargs)
245

    
246
    def get(self, path, **kwargs):
247
        return self.request('get', path, **kwargs)
248

    
249
    def head(self, path, **kwargs):
250
        return self.request('head', path, **kwargs)
251

    
252
    def post(self, path, **kwargs):
253
        return self.request('post', path, **kwargs)
254

    
255
    def put(self, path, **kwargs):
256
        return self.request('put', path, **kwargs)
257

    
258
    def copy(self, path, **kwargs):
259
        return self.request('copy', path, **kwargs)
260

    
261
    def move(self, path, **kwargs):
262
        return self.request('move', path, **kwargs)