Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 27abfa9f

History | View | Annotate | Download (9.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
import logging
39
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40
from kamaki.clients.connection.errors import KamakiConnectionError
41
from kamaki.clients.connection.errors import KamakiResponseError
42

    
43
sendlog = logging.getLogger('clients.send')
44
datasendlog = logging.getLogger('data.send')
45
recvlog = logging.getLogger('clients.recv')
46
datarecvlog = logging.getLogger('data.recv')
47

    
48

    
49
class ClientError(Exception):
50
    def __init__(self, message, status=0, details=None):
51
        try:
52
            message += '' if message and message[-1] == '\n' else '\n'
53
            serv_stat, sep, new_msg = message.partition('{')
54
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
55
            json_msg = loads(new_msg)
56
            key = json_msg.keys()[0]
57
            serv_stat = serv_stat.strip()
58

    
59
            json_msg = json_msg[key]
60
            message = '%s %s (%s)\n' % (
61
                serv_stat,
62
                key,
63
                json_msg['message']) if (
64
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
65
            status = json_msg.get('code', status)
66
            if 'details' in json_msg:
67
                if not details:
68
                    details = []
69
                if not isinstance(details, list):
70
                    details = [details]
71
                if json_msg['details']:
72
                    details.append(json_msg['details'])
73
        except Exception:
74
            pass
75
        finally:
76
            super(ClientError, self).__init__(message)
77
            self.status = status if isinstance(status, int) else 0
78
            self.details = details if details else []
79

    
80

    
81
class SilentEvent(Thread):
82
    """ Thread-run method(*args, **kwargs)"""
83
    def __init__(self, method, *args, **kwargs):
84
        super(self.__class__, self).__init__()
85
        self.method = method
86
        self.args = args
87
        self.kwargs = kwargs
88

    
89
    @property
90
    def exception(self):
91
        return getattr(self, '_exception', False)
92

    
93
    @property
94
    def value(self):
95
        return getattr(self, '_value', None)
96

    
97
    def run(self):
98
        try:
99
            self._value = self.method(*(self.args), **(self.kwargs))
100
        except Exception as e:
101
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
102
                self,
103
                type(e),
104
                e.status if isinstance(e, ClientError) else '',
105
                e))
106
            self._exception = e
107

    
108

    
109
class Client(object):
110
    MAX_THREADS = 7
111

    
112
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
113
        self.base_url = base_url
114
        self.token = token
115
        self.headers = {}
116
        self.DATE_FORMATS = [
117
            '%a %b %d %H:%M:%S %Y',
118
            '%A, %d-%b-%y %H:%M:%S GMT',
119
            '%a, %d %b %Y %H:%M:%S GMT']
120
        self.http_client = http_client
121

    
122
    def _init_thread_limit(self, limit=1):
123
        self._thread_limit = limit
124
        self._elapsed_old = 0.0
125
        self._elapsed_new = 0.0
126

    
127
    def _watch_thread_limit(self, threadlist):
128
        recvlog.debug('# running threads: %s' % len(threadlist))
129
        if (self._elapsed_old > self._elapsed_new) and (
130
                self._thread_limit < self.MAX_THREADS):
131
            self._thread_limit += 1
132
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
133
            self._thread_limit -= 1
134

    
135
        self._elapsed_old = self._elapsed_new
136
        if len(threadlist) >= self._thread_limit:
137
            self._elapsed_new = 0.0
138
            for thread in threadlist:
139
                begin_time = time()
140
                thread.join()
141
                self._elapsed_new += time() - begin_time
142
            self._elapsed_new = self._elapsed_new / len(threadlist)
143
            return []
144
        return threadlist
145

    
146
    def _raise_for_status(self, r):
147
        status_msg = getattr(r, 'status', '')
148
        try:
149
            message = '%s %s\n' % (status_msg, r.text)
150
        except:
151
            message = '%s %s\n' % (status_msg, r)
152
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
153
        raise ClientError(message, status=status)
154

    
155
    def set_header(self, name, value, iff=True):
156
        """Set a header 'name':'value'"""
157
        if value is not None and iff:
158
            self.http_client.set_header(name, value)
159

    
160
    def set_param(self, name, value=None, iff=True):
161
        if iff:
162
            self.http_client.set_param(name, value)
163

    
164
    def set_default_header(self, name, value):
165
        self.http_client.headers.setdefault(name, value)
166

    
167
    def request(
168
            self,
169
            method,
170
            path,
171
            async_headers={},
172
            async_params={},
173
            **kwargs):
174
        """In threaded/asynchronous requests, headers and params are not safe
175
        Therefore, the standard self.set_header/param system can be used only
176
        for headers and params that are common for all requests. All other
177
        params and headers should passes as
178
        @param async_headers
179
        @async_params
180
        E.g. in most queries the 'X-Auth-Token' header might be the same for
181
        all, but the 'Range' header might be different from request to request.
182
        """
183
        try:
184
            success = kwargs.pop('success', 200)
185

    
186
            data = kwargs.pop('data', None)
187
            self.set_default_header('X-Auth-Token', self.token)
188

    
189
            if 'json' in kwargs:
190
                data = dumps(kwargs.pop('json'))
191
                self.set_default_header('Content-Type', 'application/json')
192
            if data:
193
                self.set_default_header('Content-Length', '%s' % len(data))
194

    
195
            sendlog.info('perform a %s @ %s', method, self.base_url)
196

    
197
            self.http_client.url = self.base_url
198
            self.http_client.path = quote(path.encode('utf8'))
199
            r = self.http_client.perform_request(
200
                method,
201
                data,
202
                async_headers,
203
                async_params)
204

    
205
            req = self.http_client
206
            sendlog.info('%s %s', method, req.url)
207
            headers = dict(req.headers)
208
            headers.update(async_headers)
209

    
210
            for key, val in headers.items():
211
                sendlog.info('\t%s: %s', key, val)
212
            sendlog.info('')
213
            if data:
214
                datasendlog.info(data)
215

    
216
            recvlog.info('%d %s', r.status_code, r.status)
217
            for key, val in r.headers.items():
218
                recvlog.info('%s: %s', key, val)
219
            if r.content:
220
                datarecvlog.info(r.content)
221

    
222
        except (KamakiResponseError, KamakiConnectionError) as err:
223
            from traceback import format_stack
224
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
225
            self.http_client.reset_headers()
226
            self.http_client.reset_params()
227
            errstr = '%s' % err
228
            if not errstr:
229
                errstr = ('%s' % type(err))[7:-2]
230
            status = getattr(err, 'status', getattr(err, 'errno', 0))
231
            raise ClientError('%s\n' % errstr, status=status)
232

    
233
        self.http_client.reset_headers()
234
        self.http_client.reset_params()
235

    
236
        if success is not None:
237
            # Success can either be an int or a collection
238
            success = (success,) if isinstance(success, int) else success
239
            if r.status_code not in success:
240
                r.release()
241
                self._raise_for_status(r)
242
        return r
243

    
244
    def delete(self, path, **kwargs):
245
        return self.request('delete', path, **kwargs)
246

    
247
    def get(self, path, **kwargs):
248
        return self.request('get', path, **kwargs)
249

    
250
    def head(self, path, **kwargs):
251
        return self.request('head', path, **kwargs)
252

    
253
    def post(self, path, **kwargs):
254
        return self.request('post', path, **kwargs)
255

    
256
    def put(self, path, **kwargs):
257
        return self.request('put', path, **kwargs)
258

    
259
    def copy(self, path, **kwargs):
260
        return self.request('copy', path, **kwargs)
261

    
262
    def move(self, path, **kwargs):
263
        return self.request('move', path, **kwargs)