Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ c1004a00

History | View | Annotate | Download (9.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38
import logging
39
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40
from kamaki.clients.connection.errors import KamakiConnectionError
41
from kamaki.clients.connection.errors import KamakiResponseError
42

    
43
sendlog = logging.getLogger('clients.send')
44
datasendlog = logging.getLogger('data.send')
45
recvlog = logging.getLogger('clients.recv')
46
datarecvlog = logging.getLogger('data.recv')
47

    
48

    
49
class ClientError(Exception):
50
    def __init__(self, message, status=0, details=None):
51
        try:
52
            message += '' if message and message[-1] == '\n' else '\n'
53
            serv_stat, sep, new_msg = message.partition('{')
54
            new_msg = sep + new_msg
55
            json_msg = loads(new_msg)
56
            key = json_msg.keys()[0]
57

    
58
            json_msg = json_msg[key]
59
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
61
            if 'code' in json_msg:
62
                status = json_msg['code']
63
            if 'details' in json_msg:
64
                if not details:
65
                    details = []
66
                elif not isinstance(details, list):
67
                    details = [details]
68
                if json_msg['details']:
69
                    details.append(json_msg['details'])
70
        except:
71
            pass
72

    
73
        super(ClientError, self).__init__(message)
74
        self.status = status
75
        self.details = details if details else []
76

    
77

    
78
class SilentEvent(Thread):
79
    """ Thread-run method(*args, **kwargs)"""
80
    def __init__(self, method, *args, **kwargs):
81
        super(self.__class__, self).__init__()
82
        self.method = method
83
        self.args = args
84
        self.kwargs = kwargs
85

    
86
    @property
87
    def exception(self):
88
        return getattr(self, '_exception', False)
89

    
90
    @property
91
    def value(self):
92
        return getattr(self, '_value', None)
93

    
94
    def run(self):
95
        try:
96
            self._value = self.method(*(self.args), **(self.kwargs))
97
        except Exception as e:
98
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
99
                self,
100
                type(e),
101
                e.status if isinstance(e, ClientError) else '',
102
                e))
103
            self._exception = e
104

    
105

    
106
class Client(object):
107
    POOL_SIZE = 7
108

    
109
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
110
        self.base_url = base_url
111
        self.token = token
112
        self.headers = {}
113
        self.DATE_FORMATS = [
114
            '%a %b %d %H:%M:%S %Y',
115
            '%A, %d-%b-%y %H:%M:%S GMT',
116
            '%a, %d %b %Y %H:%M:%S GMT']
117
        self.http_client = http_client
118

    
119
    def _init_thread_limit(self, limit=1):
120
        self._thread_limit = limit
121
        self._elapsed_old = 0.0
122
        self._elapsed_new = 0.0
123

    
124
    def _watch_thread_limit(self, threadlist):
125
        recvlog.debug('# running threads: %s' % len(threadlist))
126
        if (self._elapsed_old > self._elapsed_new) and (
127
                self._thread_limit < self.POOL_SIZE):
128
            self._thread_limit += 1
129
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
130
            self._thread_limit -= 1
131

    
132
        self._elapsed_old = self._elapsed_new
133
        if len(threadlist) >= self._thread_limit:
134
            self._elapsed_new = 0.0
135
            for thread in threadlist:
136
                begin_time = time()
137
                thread.join()
138
                self._elapsed_new += time() - begin_time
139
            self._elapsed_new = self._elapsed_new / len(threadlist)
140
            return []
141
        return threadlist
142

    
143
    def _raise_for_status(self, r):
144
        status_msg = getattr(r, 'status', '')
145
        try:
146
            message = '%s %s\n' % (status_msg, r.text)
147
        except:
148
            message = '%s %s\n' % (status_msg, r)
149
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
150
        raise ClientError(message, status=status)
151

    
152
    def set_header(self, name, value, iff=True):
153
        """Set a header 'name':'value'"""
154
        if value is not None and iff:
155
            self.http_client.set_header(name, value)
156

    
157
    def set_param(self, name, value=None, iff=True):
158
        if iff:
159
            self.http_client.set_param(name, value)
160

    
161
    def set_default_header(self, name, value):
162
        self.http_client.headers.setdefault(name, value)
163

    
164
    def request(
165
            self,
166
            method,
167
            path,
168
            async_headers={},
169
            async_params={},
170
            **kwargs):
171
        """In threaded/asynchronous requests, headers and params are not safe
172
        Therefore, the standard self.set_header/param system can be used only
173
        for headers and params that are common for all requests. All other
174
        params and headers should passes as
175
        @param async_headers
176
        @async_params
177
        E.g. in most queries the 'X-Auth-Token' header might be the same for
178
        all, but the 'Range' header might be different from request to request.
179
        """
180
        try:
181
            success = kwargs.pop('success', 200)
182

    
183
            data = kwargs.pop('data', None)
184
            self.set_default_header('X-Auth-Token', self.token)
185

    
186
            if 'json' in kwargs:
187
                data = dumps(kwargs.pop('json'))
188
                self.set_default_header('Content-Type', 'application/json')
189
            if data:
190
                self.set_default_header('Content-Length', '%s' % len(data))
191

    
192
            sendlog.info('perform a %s @ %s', method, self.base_url)
193

    
194
            self.http_client.url = self.base_url
195
            self.http_client.path = quote(path.encode('utf8'))
196
            r = self.http_client.perform_request(
197
                method,
198
                data,
199
                async_headers,
200
                async_params)
201

    
202
            req = self.http_client
203
            sendlog.info('%s %s', method, req.url)
204
            headers = dict(req.headers)
205
            headers.update(async_headers)
206

    
207
            for key, val in headers.items():
208
                sendlog.info('\t%s: %s', key, val)
209
            sendlog.info('')
210
            if data:
211
                datasendlog.info(data)
212

    
213
            recvlog.info('%d %s', r.status_code, r.status)
214
            for key, val in r.headers.items():
215
                recvlog.info('%s: %s', key, val)
216
            if r.content:
217
                datarecvlog.info(r.content)
218

    
219
        except (KamakiResponseError, KamakiConnectionError) as err:
220
            from traceback import format_stack
221
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
222
            self.http_client.reset_headers()
223
            self.http_client.reset_params()
224
            errstr = '%s' % err
225
            if not errstr:
226
                errstr = ('%s' % type(err))[7:-2]
227
            status = getattr(err, 'status', getattr(err, 'errno', 0))
228
            raise ClientError('%s\n' % errstr, status=status)
229

    
230
        self.http_client.reset_headers()
231
        self.http_client.reset_params()
232

    
233
        if success is not None:
234
            # Success can either be an int or a collection
235
            success = (success,) if isinstance(success, int) else success
236
            if r.status_code not in success:
237
                r.release()
238
                self._raise_for_status(r)
239
        return r
240

    
241
    def delete(self, path, **kwargs):
242
        return self.request('delete', path, **kwargs)
243

    
244
    def get(self, path, **kwargs):
245
        return self.request('get', path, **kwargs)
246

    
247
    def head(self, path, **kwargs):
248
        return self.request('head', path, **kwargs)
249

    
250
    def post(self, path, **kwargs):
251
        return self.request('post', path, **kwargs)
252

    
253
    def put(self, path, **kwargs):
254
        return self.request('put', path, **kwargs)
255

    
256
    def copy(self, path, **kwargs):
257
        return self.request('copy', path, **kwargs)
258

    
259
    def move(self, path, **kwargs):
260
        return self.request('move', path, **kwargs)