Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 9c6c3d69

History | View | Annotate | Download (9.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
import logging
39
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40
from kamaki.clients.connection.errors import KamakiConnectionError
41
from kamaki.clients.connection.errors import KamakiResponseError
42

    
43
sendlog = logging.getLogger('clients.send')
44
datasendlog = logging.getLogger('data.send')
45
recvlog = logging.getLogger('clients.recv')
46
datarecvlog = logging.getLogger('data.recv')
47

    
48

    
49
class ClientError(Exception):
50
    def __init__(self, message, status=0, details=None):
51
        try:
52
            message += '' if message and message[-1] == '\n' else '\n'
53
            serv_stat, sep, new_msg = message.partition('{')
54
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
55
            json_msg = loads(new_msg)
56
            key = json_msg.keys()[0]
57
            serv_stat = serv_stat.strip()
58

    
59
            json_msg = json_msg[key]
60
            message = '%s %s (%s)\n' % (
61
                serv_stat,
62
                key,
63
                json_msg['message']) if (
64
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
65
            status = json_msg.get('code', status)
66
            if 'details' in json_msg:
67
                if not details:
68
                    details = []
69
                if not isinstance(details, list):
70
                    details = [details]
71
                if json_msg['details']:
72
                    details.append(json_msg['details'])
73
        except Exception:
74
            pass
75
        finally:
76
            super(ClientError, self).__init__(message)
77
            self.status = status if isinstance(status, int) else 0
78
            self.details = details if details else []
79

    
80

    
81
class SilentEvent(Thread):
82
    """ Thread-run method(*args, **kwargs)"""
83
    def __init__(self, method, *args, **kwargs):
84
        super(self.__class__, self).__init__()
85
        self.method = method
86
        self.args = args
87
        self.kwargs = kwargs
88

    
89
    @property
90
    def exception(self):
91
        return getattr(self, '_exception', False)
92

    
93
    @property
94
    def value(self):
95
        return getattr(self, '_value', None)
96

    
97
    def run(self):
98
        try:
99
            self._value = self.method(*(self.args), **(self.kwargs))
100
        except Exception as e:
101
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
102
                self,
103
                type(e),
104
                e.status if isinstance(e, ClientError) else '',
105
                e))
106
            self._exception = e
107

    
108

    
109
class Client(object):
110

    
111
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112
        self.base_url = base_url
113
        self.token = token
114
        self.headers = {}
115
        self.DATE_FORMATS = [
116
            '%a %b %d %H:%M:%S %Y',
117
            '%A, %d-%b-%y %H:%M:%S GMT',
118
            '%a, %d %b %Y %H:%M:%S GMT']
119
        self.http_client = http_client
120
        self.MAX_THREADS = 7
121

    
122
    def _init_thread_limit(self, limit=1):
123
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
124
        self._thread_limit = limit
125
        self._elapsed_old = 0.0
126
        self._elapsed_new = 0.0
127

    
128
    def _watch_thread_limit(self, threadlist):
129
        self._thread_limit = getattr(self, '_thread_limit', 1)
130
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
131
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
132
        recvlog.debug('# running threads: %s' % len(threadlist))
133
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
134
                self._thread_limit < self.MAX_THREADS):
135
            self._thread_limit += 1
136
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
137
            self._thread_limit -= 1
138

    
139
        self._elapsed_old = self._elapsed_new
140
        if len(threadlist) >= self._thread_limit:
141
            self._elapsed_new = 0.0
142
            for thread in threadlist:
143
                begin_time = time()
144
                thread.join()
145
                self._elapsed_new += time() - begin_time
146
            self._elapsed_new = self._elapsed_new / len(threadlist)
147
            return []
148
        return threadlist
149

    
150
    def _raise_for_status(self, r):
151
        status_msg = getattr(r, 'status', '')
152
        try:
153
            message = '%s %s\n' % (status_msg, r.text)
154
        except:
155
            message = '%s %s\n' % (status_msg, r)
156
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
157
        raise ClientError(message, status=status)
158

    
159
    def set_header(self, name, value, iff=True):
160
        """Set a header 'name':'value'"""
161
        if value is not None and iff:
162
            self.http_client.set_header(name, value)
163

    
164
    def set_param(self, name, value=None, iff=True):
165
        if iff:
166
            self.http_client.set_param(name, value)
167

    
168
    def set_default_header(self, name, value):
169
        self.http_client.headers.setdefault(name, value)
170

    
171
    def request(
172
            self,
173
            method,
174
            path,
175
            async_headers={},
176
            async_params={},
177
            **kwargs):
178
        """In threaded/asynchronous requests, headers and params are not safe
179
        Therefore, the standard self.set_header/param system can be used only
180
        for headers and params that are common for all requests. All other
181
        params and headers should passes as
182
        @param async_headers
183
        @async_params
184
        E.g. in most queries the 'X-Auth-Token' header might be the same for
185
        all, but the 'Range' header might be different from request to request.
186
        """
187
        try:
188
            success = kwargs.pop('success', 200)
189

    
190
            data = kwargs.pop('data', None)
191
            self.set_default_header('X-Auth-Token', self.token)
192

    
193
            if 'json' in kwargs:
194
                data = dumps(kwargs.pop('json'))
195
                self.set_default_header('Content-Type', 'application/json')
196
            if data:
197
                self.set_default_header('Content-Length', '%s' % len(data))
198

    
199
            sendlog.info('perform a %s @ %s', method, self.base_url)
200

    
201
            self.http_client.url = self.base_url
202
            self.http_client.path = quote(path.encode('utf8'))
203
            r = self.http_client.perform_request(
204
                method,
205
                data,
206
                async_headers,
207
                async_params)
208

    
209
            req = self.http_client
210
            sendlog.info('%s %s', method, req.url)
211
            headers = dict(req.headers)
212
            headers.update(async_headers)
213

    
214
            for key, val in headers.items():
215
                sendlog.info('\t%s: %s', key, val)
216
            sendlog.info('')
217
            if data:
218
                datasendlog.info(data)
219

    
220
            recvlog.info('%d %s', r.status_code, r.status)
221
            for key, val in r.headers.items():
222
                recvlog.info('%s: %s', key, val)
223
            if r.content:
224
                datarecvlog.info(r.content)
225

    
226
        except (KamakiResponseError, KamakiConnectionError) as err:
227
            from traceback import format_stack
228
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
229
            self.http_client.reset_headers()
230
            self.http_client.reset_params()
231
            errstr = '%s' % err
232
            if not errstr:
233
                errstr = ('%s' % type(err))[7:-2]
234
            status = getattr(err, 'status', getattr(err, 'errno', 0))
235
            raise ClientError('%s\n' % errstr, status=status)
236

    
237
        self.http_client.reset_headers()
238
        self.http_client.reset_params()
239

    
240
        if success is not None:
241
            # Success can either be an int or a collection
242
            success = (success,) if isinstance(success, int) else success
243
            if r.status_code not in success:
244
                r.release()
245
                self._raise_for_status(r)
246
        return r
247

    
248
    def delete(self, path, **kwargs):
249
        return self.request('delete', path, **kwargs)
250

    
251
    def get(self, path, **kwargs):
252
        return self.request('get', path, **kwargs)
253

    
254
    def head(self, path, **kwargs):
255
        return self.request('head', path, **kwargs)
256

    
257
    def post(self, path, **kwargs):
258
        return self.request('post', path, **kwargs)
259

    
260
    def put(self, path, **kwargs):
261
        return self.request('put', path, **kwargs)
262

    
263
    def copy(self, path, **kwargs):
264
        return self.request('copy', path, **kwargs)
265

    
266
    def move(self, path, **kwargs):
267
        return self.request('move', path, **kwargs)