Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 6a6175c0

History | View | Annotate | Download (10 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
import logging
39
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40
from kamaki.clients.connection.errors import KamakiConnectionError
41
from kamaki.clients.connection.errors import KamakiResponseError
42

    
43
sendlog = logging.getLogger('clients.send')
44
datasendlog = logging.getLogger('data.send')
45
recvlog = logging.getLogger('clients.recv')
46
datarecvlog = logging.getLogger('data.recv')
47

    
48

    
49
class ClientError(Exception):
50
    def __init__(self, message, status=0, details=None):
51
        try:
52
            message += '' if message and message[-1] == '\n' else '\n'
53
            serv_stat, sep, new_msg = message.partition('{')
54
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
55
            json_msg = loads(new_msg)
56
            key = json_msg.keys()[0]
57
            serv_stat = serv_stat.strip()
58

    
59
            json_msg = json_msg[key]
60
            message = '%s %s (%s)\n' % (
61
                serv_stat,
62
                key,
63
                json_msg['message']) if (
64
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
65
            status = json_msg.get('code', status)
66
            if 'details' in json_msg:
67
                if not details:
68
                    details = []
69
                if not isinstance(details, list):
70
                    details = [details]
71
                if json_msg['details']:
72
                    details.append(json_msg['details'])
73
        except Exception:
74
            pass
75
        finally:
76
            while message.endswith('\n\n'):
77
                message = message[:-1]
78
            super(ClientError, self).__init__(message)
79
            self.status = status if isinstance(status, int) else 0
80
            self.details = details if details else []
81

    
82

    
83
class SilentEvent(Thread):
84
    """ Thread-run method(*args, **kwargs)"""
85
    def __init__(self, method, *args, **kwargs):
86
        super(self.__class__, self).__init__()
87
        self.method = method
88
        self.args = args
89
        self.kwargs = kwargs
90

    
91
    @property
92
    def exception(self):
93
        return getattr(self, '_exception', False)
94

    
95
    @property
96
    def value(self):
97
        return getattr(self, '_value', None)
98

    
99
    def run(self):
100
        try:
101
            self._value = self.method(*(self.args), **(self.kwargs))
102
        except Exception as e:
103
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
104
                self,
105
                type(e),
106
                e.status if isinstance(e, ClientError) else '',
107
                e))
108
            self._exception = e
109

    
110

    
111
class Client(object):
112

    
113
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
114
        self.base_url = base_url
115
        self.token = token
116
        self.headers = {}
117
        self.DATE_FORMATS = [
118
            '%a %b %d %H:%M:%S %Y',
119
            '%A, %d-%b-%y %H:%M:%S GMT',
120
            '%a, %d %b %Y %H:%M:%S GMT']
121
        self.http_client = http_client
122
        self.MAX_THREADS = 7
123

    
124
    def _init_thread_limit(self, limit=1):
125
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
126
        self._thread_limit = limit
127
        self._elapsed_old = 0.0
128
        self._elapsed_new = 0.0
129

    
130
    def _watch_thread_limit(self, threadlist):
131
        self._thread_limit = getattr(self, '_thread_limit', 1)
132
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
133
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
134
        recvlog.debug('# running threads: %s' % len(threadlist))
135
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
136
                self._thread_limit < self.MAX_THREADS):
137
            self._thread_limit += 1
138
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
139
            self._thread_limit -= 1
140

    
141
        self._elapsed_old = self._elapsed_new
142
        if len(threadlist) >= self._thread_limit:
143
            self._elapsed_new = 0.0
144
            for thread in threadlist:
145
                begin_time = time()
146
                thread.join()
147
                self._elapsed_new += time() - begin_time
148
            self._elapsed_new = self._elapsed_new / len(threadlist)
149
            return []
150
        return threadlist
151

    
152
    def _raise_for_status(self, r):
153
        status_msg = getattr(r, 'status', None) or ''
154
        try:
155
            message = '%s %s\n' % (status_msg, r.text)
156
        except:
157
            message = '%s %s\n' % (status_msg, r)
158
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
159
        raise ClientError(message, status=status)
160

    
161
    def set_header(self, name, value, iff=True):
162
        """Set a header 'name':'value'"""
163
        if value is not None and iff:
164
            self.http_client.set_header(name, value)
165

    
166
    def set_param(self, name, value=None, iff=True):
167
        if iff:
168
            self.http_client.set_param(name, value)
169

    
170
    def request(
171
            self,
172
            method,
173
            path,
174
            async_headers={},
175
            async_params={},
176
            **kwargs):
177
        """In threaded/asynchronous requests, headers and params are not safe
178
        Therefore, the standard self.set_header/param system can be used only
179
        for headers and params that are common for all requests. All other
180
        params and headers should passes as
181
        @param async_headers
182
        @async_params
183
        E.g. in most queries the 'X-Auth-Token' header might be the same for
184
        all, but the 'Range' header might be different from request to request.
185
        """
186
        assert isinstance(method, str) or isinstance(method, unicode)
187
        assert method
188
        assert isinstance(path, str) or isinstance(path, unicode)
189
        assert path
190
        try:
191
            success = kwargs.pop('success', 200)
192
            data = kwargs.pop('data', None)
193
            self.http_client.headers.setdefault('X-Auth-Token', self.token)
194

    
195
            if 'json' in kwargs:
196
                data = dumps(kwargs.pop('json'))
197
                self.http_client.headers.setdefault(
198
                    'Content-Type',
199
                    'application/json')
200
            if data:
201
                self.http_client.headers.setdefault(
202
                    'Content-Length',
203
                    '%s' % len(data))
204

    
205
            sendlog.info('perform a %s @ %s', method, self.base_url)
206

    
207
            self.http_client.url = self.base_url
208
            self.http_client.path = quote(path.encode('utf8'))
209
            r = self.http_client.perform_request(
210
                method,
211
                data,
212
                async_headers,
213
                async_params)
214

    
215
            req = self.http_client
216
            sendlog.info('%s %s', method, req.url)
217
            headers = dict(req.headers)
218
            headers.update(async_headers)
219

    
220
            for key, val in headers.items():
221
                sendlog.info('\t%s: %s', key, val)
222
            sendlog.info('')
223
            if data:
224
                datasendlog.info(data)
225

    
226
            recvlog.info('%d %s', r.status_code, r.status)
227
            for key, val in r.headers.items():
228
                recvlog.info('%s: %s', key, val)
229
            if r.content:
230
                datarecvlog.info(r.content)
231

    
232
        except (KamakiResponseError, KamakiConnectionError) as err:
233
            from traceback import format_stack
234
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
235
            self.http_client.reset_headers()
236
            self.http_client.reset_params()
237
            errstr = '%s' % err
238
            if not errstr:
239
                errstr = ('%s' % type(err))[7:-2]
240
            status = getattr(err, 'status', getattr(err, 'errno', 0))
241
            raise ClientError('%s\n' % errstr, status=status)
242
        finally:
243
            self.http_client.reset_headers()
244
            self.http_client.reset_params()
245

    
246
        if success is not None:
247
            # Success can either be an int or a collection
248
            success = (success,) if isinstance(success, int) else success
249
            if r.status_code not in success:
250
                r.release()
251
                self._raise_for_status(r)
252
        return r
253

    
254
    def delete(self, path, **kwargs):
255
        return self.request('delete', path, **kwargs)
256

    
257
    def get(self, path, **kwargs):
258
        return self.request('get', path, **kwargs)
259

    
260
    def head(self, path, **kwargs):
261
        return self.request('head', path, **kwargs)
262

    
263
    def post(self, path, **kwargs):
264
        return self.request('post', path, **kwargs)
265

    
266
    def put(self, path, **kwargs):
267
        return self.request('put', path, **kwargs)
268

    
269
    def copy(self, path, **kwargs):
270
        return self.request('copy', path, **kwargs)
271

    
272
    def move(self, path, **kwargs):
273
        return self.request('move', path, **kwargs)