Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 42115b51

History | View | Annotate | Download (10.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from threading import Thread
36
from json import dumps, loads
37
from time import time
38

    
39
from kamaki.clients.utils import logger
40
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
41
from kamaki.clients.connection.errors import KamakiConnectionError
42
from kamaki.clients.connection.errors import KamakiResponseError
43

    
44
LOG_TOKEN = False
45
DEBUG_LOG = logger.get_log_filename()
46

    
47
logger.add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
48
sendlog = logger.get_logger('clients.send')
49
sendlog.debug('Logging location: %s' % DEBUG_LOG)
50
logger.add_file_logger('data.send', __name__, filename=DEBUG_LOG)
51
datasendlog = logger.get_logger('data.send')
52
logger.add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
53
recvlog = logger.get_logger('clients.recv')
54
logger.add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
55
datarecvlog = logger.get_logger('data.recv')
56

    
57

    
58
class ClientError(Exception):
59
    def __init__(self, message, status=0, details=None):
60
        try:
61
            message += '' if message and message[-1] == '\n' else '\n'
62
            serv_stat, sep, new_msg = message.partition('{')
63
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
64
            json_msg = loads(new_msg)
65
            key = json_msg.keys()[0]
66
            serv_stat = serv_stat.strip()
67

    
68
            json_msg = json_msg[key]
69
            message = '%s %s (%s)\n' % (
70
                serv_stat,
71
                key,
72
                json_msg['message']) if (
73
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
74
            status = json_msg.get('code', status)
75
            if 'details' in json_msg:
76
                if not details:
77
                    details = []
78
                if not isinstance(details, list):
79
                    details = [details]
80
                if json_msg['details']:
81
                    details.append(json_msg['details'])
82
        except Exception:
83
            pass
84
        finally:
85
            while message.endswith('\n\n'):
86
                message = message[:-1]
87
            super(ClientError, self).__init__(message)
88
            self.status = status if isinstance(status, int) else 0
89
            self.details = details if details else []
90

    
91

    
92
class SilentEvent(Thread):
93
    """ Thread-run method(*args, **kwargs)"""
94
    def __init__(self, method, *args, **kwargs):
95
        super(self.__class__, self).__init__()
96
        self.method = method
97
        self.args = args
98
        self.kwargs = kwargs
99

    
100
    @property
101
    def exception(self):
102
        return getattr(self, '_exception', False)
103

    
104
    @property
105
    def value(self):
106
        return getattr(self, '_value', None)
107

    
108
    def run(self):
109
        try:
110
            self._value = self.method(*(self.args), **(self.kwargs))
111
        except Exception as e:
112
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
113
                self,
114
                type(e),
115
                e.status if isinstance(e, ClientError) else '',
116
                e))
117
            self._exception = e
118

    
119

    
120
class Client(object):
121

    
122
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
123
        self.base_url = base_url
124
        self.token = token
125
        self.headers = {}
126
        self.DATE_FORMATS = [
127
            '%a %b %d %H:%M:%S %Y',
128
            '%A, %d-%b-%y %H:%M:%S GMT',
129
            '%a, %d %b %Y %H:%M:%S GMT']
130
        self.http_client = http_client
131
        self.MAX_THREADS = 7
132

    
133
    def _init_thread_limit(self, limit=1):
134
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
135
        self._thread_limit = limit
136
        self._elapsed_old = 0.0
137
        self._elapsed_new = 0.0
138

    
139
    def _watch_thread_limit(self, threadlist):
140
        self._thread_limit = getattr(self, '_thread_limit', 1)
141
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
142
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
143
        recvlog.debug('# running threads: %s' % len(threadlist))
144
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
145
                self._thread_limit < self.MAX_THREADS):
146
            self._thread_limit += 1
147
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
148
            self._thread_limit -= 1
149

    
150
        self._elapsed_old = self._elapsed_new
151
        if len(threadlist) >= self._thread_limit:
152
            self._elapsed_new = 0.0
153
            for thread in threadlist:
154
                begin_time = time()
155
                thread.join()
156
                self._elapsed_new += time() - begin_time
157
            self._elapsed_new = self._elapsed_new / len(threadlist)
158
            return []
159
        return threadlist
160

    
161
    def _raise_for_status(self, r):
162
        status_msg = getattr(r, 'status', None) or ''
163
        try:
164
            message = '%s %s\n' % (status_msg, r.text)
165
        except:
166
            message = '%s %s\n' % (status_msg, r)
167
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
168
        raise ClientError(message, status=status)
169

    
170
    def set_header(self, name, value, iff=True):
171
        """Set a header 'name':'value'"""
172
        if value is not None and iff:
173
            self.http_client.set_header(name, value)
174

    
175
    def set_param(self, name, value=None, iff=True):
176
        if iff:
177
            self.http_client.set_param(name, value)
178

    
179
    def request(
180
            self,
181
            method,
182
            path,
183
            async_headers={},
184
            async_params={},
185
            **kwargs):
186
        """In threaded/asynchronous requests, headers and params are not safe
187
        Therefore, the standard self.set_header/param system can be used only
188
        for headers and params that are common for all requests. All other
189
        params and headers should passes as
190
        @param async_headers
191
        @async_params
192
        E.g. in most queries the 'X-Auth-Token' header might be the same for
193
        all, but the 'Range' header might be different from request to request.
194
        """
195
        assert isinstance(method, str) or isinstance(method, unicode)
196
        assert method
197
        assert isinstance(path, str) or isinstance(path, unicode)
198
        try:
199
            success = kwargs.pop('success', 200)
200
            data = kwargs.pop('data', None)
201
            self.http_client.headers.setdefault('X-Auth-Token', self.token)
202

    
203
            if 'json' in kwargs:
204
                data = dumps(kwargs.pop('json'))
205
                self.http_client.headers.setdefault(
206
                    'Content-Type',
207
                    'application/json')
208
            if data:
209
                self.http_client.headers.setdefault(
210
                    'Content-Length',
211
                    '%s' % len(data))
212

    
213
            sendlog.info('perform a %s @ %s', method, self.base_url)
214

    
215
            self.http_client.url = self.base_url
216
            self.http_client.path = quote(path.encode('utf8'))
217
            r = self.http_client.perform_request(
218
                method,
219
                data,
220
                async_headers,
221
                async_params)
222

    
223
            req = self.http_client
224
            sendlog.info('%s %s', method, req.url)
225
            headers = dict(req.headers)
226
            headers.update(async_headers)
227

    
228
            for key, val in headers.items():
229
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
230
                    continue
231
                sendlog.info('\t%s: %s', key, val)
232
            sendlog.info('')
233
            if data:
234
                datasendlog.info(data)
235

    
236
            recvlog.info('%d %s', r.status_code, r.status)
237
            for key, val in r.headers.items():
238
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
239
                    continue
240
                recvlog.info('%s: %s', key, val)
241
            if r.content:
242
                datarecvlog.info(r.content)
243

    
244
        except (KamakiResponseError, KamakiConnectionError) as err:
245
            from traceback import format_stack
246
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
247
            self.http_client.reset_headers()
248
            self.http_client.reset_params()
249
            errstr = '%s' % err
250
            if not errstr:
251
                errstr = ('%s' % type(err))[7:-2]
252
            status = getattr(err, 'status', getattr(err, 'errno', 0))
253
            raise ClientError('%s\n' % errstr, status=status)
254
        finally:
255
            self.http_client.reset_headers()
256
            self.http_client.reset_params()
257

    
258
        if success is not None:
259
            # Success can either be an int or a collection
260
            success = (success,) if isinstance(success, int) else success
261
            if r.status_code not in success:
262
                r.release()
263
                self._raise_for_status(r)
264
        return r
265

    
266
    def delete(self, path, **kwargs):
267
        return self.request('delete', path, **kwargs)
268

    
269
    def get(self, path, **kwargs):
270
        return self.request('get', path, **kwargs)
271

    
272
    def head(self, path, **kwargs):
273
        return self.request('head', path, **kwargs)
274

    
275
    def post(self, path, **kwargs):
276
        return self.request('post', path, **kwargs)
277

    
278
    def put(self, path, **kwargs):
279
        return self.request('put', path, **kwargs)
280

    
281
    def copy(self, path, **kwargs):
282
        return self.request('copy', path, **kwargs)
283

    
284
    def move(self, path, **kwargs):
285
        return self.request('move', path, **kwargs)