Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 6069b53b

History | View | Annotate | Download (8.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import Thread
35
from json import dumps, loads
36
from time import time
37
import logging
38
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39
from kamaki.clients.connection.errors import HTTPConnectionError
40
from kamaki.clients.connection.errors import HTTPResponseError
41

    
42
sendlog = logging.getLogger('clients.send')
43
recvlog = logging.getLogger('clients.recv')
44

    
45

    
46
class ClientError(Exception):
47
    def __init__(self, message, status=0, details=[]):
48
        try:
49
            message += '' if message and message[-1] == '\n' else '\n'
50
            serv_stat, sep, new_msg = message.partition('{')
51
            new_msg = sep + new_msg
52
            json_msg = loads(new_msg)
53
            key = json_msg.keys()[0]
54

    
55
            json_msg = json_msg[key]
56
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
57
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
58
            if 'code' in json_msg:
59
                status = json_msg['code']
60
            if 'details' in json_msg:
61
                if not details:
62
                    details = []
63
                elif not isinstance(details, list):
64
                    details = [details]
65
                if json_msg['details']:
66
                    details.append(json_msg['details'])
67
        except:
68
            pass
69

    
70
        super(ClientError, self).__init__(message)
71
        self.status = status
72
        self.details = details
73

    
74

    
75
class SilentEvent(Thread):
76
    """ Thread-run method(*args, **kwargs)
77
        put exception in exception_bucket
78
    """
79
    def __init__(self, method, *args, **kwargs):
80
        super(self.__class__, self).__init__()
81
        self.method = method
82
        self.args = args
83
        self.kwargs = kwargs
84

    
85
    @property
86
    def exception(self):
87
        return getattr(self, '_exception', False)
88

    
89
    @property
90
    def value(self):
91
        return getattr(self, '_value', None)
92

    
93
    def run(self):
94
        try:
95
            self._value = self.method(*(self.args), **(self.kwargs))
96
        except Exception as e:
97
            print('______\n%s\n_______' % e)
98
            self._exception = e
99

    
100

    
101
class Client(object):
102
    POOL_SIZE = 7
103

    
104
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
105
        self.base_url = base_url
106
        self.token = token
107
        self.headers = {}
108
        self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
109
            "%A, %d-%b-%y %H:%M:%S GMT",
110
            "%a, %d %b %Y %H:%M:%S GMT"]
111
        self.http_client = http_client
112

    
113
    def _init_thread_limit(self, limit=1):
114
        self._thread_limit = limit
115
        self._elapsed_old = 0.0
116
        self._elapsed_new = 0.0
117

    
118
    def _watch_thread_limit(self, threadlist):
119
        if self._elapsed_old > self._elapsed_new\
120
        and self._thread_limit < self.POOL_SIZE:
121
            self._thread_limit += 1
122
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
123
            self._thread_limit -= 1
124

    
125
        self._elapsed_old = self._elapsed_new
126
        if len(threadlist) >= self._thread_limit:
127
            self._elapsed_new = 0.0
128
            for thread in threadlist:
129
                begin_time = time()
130
                thread.join()
131
                self._elapsed_new += time() - begin_time
132
            self._elapsed_new = self._elapsed_new / len(threadlist)
133
            return []
134
        return threadlist
135

    
136
    def _raise_for_status(self, r):
137
        status_msg = getattr(r, 'status', '')
138
        try:
139
            message = '%s %s\n' % (status_msg, r.text)
140
        except:
141
            message = '%s %s\n' % (status_msg, r)
142
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
143
        raise ClientError(message, status=status)
144

    
145
    def set_header(self, name, value, iff=True):
146
        """Set a header 'name':'value'"""
147
        if value is not None and iff:
148
            self.http_client.set_header(name, value)
149

    
150
    def set_param(self, name, value=None, iff=True):
151
        if iff:
152
            self.http_client.set_param(name, value)
153

    
154
    def set_default_header(self, name, value):
155
        self.http_client.headers.setdefault(name, value)
156

    
157
    def request(self,
158
        method,
159
        path,
160
        async_headers={},
161
        async_params={},
162
        **kwargs):
163
        """In threaded/asynchronous requests, headers and params are not safe
164
        Therefore, the standard self.set_header/param system can be used only
165
        for headers and params that are common for all requests. All other
166
        params and headers should passes as
167
        @param async_headers
168
        @async_params
169
        E.g. in most queries the 'X-Auth-Token' header might be the same for
170
        all, but the 'Range' header might be different from request to request.
171
        """
172

    
173
        try:
174
            success = kwargs.pop('success', 200)
175

    
176
            data = kwargs.pop('data', None)
177
            self.set_default_header('X-Auth-Token', self.token)
178

    
179
            if 'json' in kwargs:
180
                data = dumps(kwargs.pop('json'))
181
                self.set_default_header('Content-Type', 'application/json')
182
            if data:
183
                self.set_default_header('Content-Length', unicode(len(data)))
184

    
185
            self.http_client.url = self.base_url
186
            self.http_client.path = path
187
            r = self.http_client.perform_request(method,
188
                data,
189
                async_headers,
190
                async_params)
191

    
192
            req = self.http_client
193
            sendlog.info('%s %s', method, req.url)
194
            headers = dict(req.headers)
195
            headers.update(async_headers)
196

    
197
            for key, val in headers.items():
198
                sendlog.info('\t%s: %s', key, val)
199
            sendlog.info('')
200
            if data:
201
                sendlog.info('%s', data)
202

    
203
            recvlog.info('%d %s', r.status_code, r.status)
204
            for key, val in r.headers.items():
205
                recvlog.info('%s: %s', key, val)
206
            if r.content:
207
                recvlog.debug(r.content)
208

    
209
        except (HTTPResponseError, HTTPConnectionError) as err:
210
            from traceback import format_stack
211
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
212
            self.http_client.reset_headers()
213
            self.http_client.reset_params()
214
            errstr = '%s' % err
215
            if not errstr:
216
                errstr = ('%s' % type(err))[7:-2]
217
            raise ClientError('%s\n' % errstr,
218
                status=getattr(err, 'status', 0))
219

    
220
        self.http_client.reset_headers()
221
        self.http_client.reset_params()
222

    
223
        if success is not None:
224
            # Success can either be an in or a collection
225
            success = (success,) if isinstance(success, int) else success
226
            if r.status_code not in success:
227
                r.release()
228
                self._raise_for_status(r)
229
        return r
230

    
231
    def delete(self, path, **kwargs):
232
        return self.request('delete', path, **kwargs)
233

    
234
    def get(self, path, **kwargs):
235
        return self.request('get', path, **kwargs)
236

    
237
    def head(self, path, **kwargs):
238
        return self.request('head', path, **kwargs)
239

    
240
    def post(self, path, **kwargs):
241
        return self.request('post', path, **kwargs)
242

    
243
    def put(self, path, **kwargs):
244
        return self.request('put', path, **kwargs)
245

    
246
    def copy(self, path, **kwargs):
247
        return self.request('copy', path, **kwargs)
248

    
249
    def move(self, path, **kwargs):
250
        return self.request('move', path, **kwargs)