Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ c2b5da2f

History | View | Annotate | Download (14.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42

    
43
from objpool.http import PooledHTTPConnection
44

    
45
from kamaki.clients.utils import logger
46

    
47
LOG_TOKEN = False
48
DEBUG_LOG = logger.get_log_filename()
49

    
50
logger.add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
51
sendlog = logger.get_logger('clients.send')
52
sendlog.debug('Logging location: %s' % DEBUG_LOG)
53

    
54
logger.add_file_logger('data.send', __name__, filename=DEBUG_LOG)
55
datasendlog = logger.get_logger('data.send')
56

    
57
logger.add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
58
recvlog = logger.get_logger('clients.recv')
59

    
60
logger.add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
61
datarecvlog = logger.get_logger('data.recv')
62

    
63
logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
64
clienterrorlog = logger.get_logger('ClientError')
65

    
66
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
67

    
68

    
69
def _encode(v):
70
    if v and isinstance(v, unicode):
71
        return quote(v.encode('utf-8'))
72
    return v
73

    
74

    
75
class ClientError(Exception):
76
    def __init__(self, message, status=0, details=None):
77
        clienterrorlog.debug(
78
            'msg[%s], sts[%s], dtl[%s]' % (message, status, details))
79
        try:
80
            message += '' if message and message[-1] == '\n' else '\n'
81
            serv_stat, sep, new_msg = message.partition('{')
82
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
83
            json_msg = loads(new_msg)
84
            key = json_msg.keys()[0]
85
            serv_stat = serv_stat.strip()
86

    
87
            json_msg = json_msg[key]
88
            message = '%s %s (%s)\n' % (
89
                serv_stat,
90
                key,
91
                json_msg['message']) if (
92
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
93
            status = json_msg.get('code', status)
94
            if 'details' in json_msg:
95
                if not details:
96
                    details = []
97
                if not isinstance(details, list):
98
                    details = [details]
99
                if json_msg['details']:
100
                    details.append(json_msg['details'])
101
        except Exception:
102
            pass
103
        finally:
104
            while message.endswith('\n\n'):
105
                message = message[:-1]
106
            super(ClientError, self).__init__(message)
107
            self.status = status if isinstance(status, int) else 0
108
            self.details = details if details else []
109

    
110

    
111
class RequestManager(object):
112
    """Handle http request information"""
113

    
114
    def _connection_info(self, url, path, params={}):
115
        """ Set self.url to scheme://netloc/?params
116
        :param url: (str or unicode) The service url
117

118
        :param path: (str or unicode) The service path (url/path)
119

120
        :param params: (dict) Parameters to add to final url
121

122
        :returns: (scheme, netloc)
123
        """
124
        url = _encode(url) if url else 'http://127.0.0.1/'
125
        url += '' if url.endswith('/') else '/'
126
        if path:
127
            url += _encode(path[1:] if path.startswith('/') else path)
128
        for i, (key, val) in enumerate(params.items()):
129
            val = _encode(val)
130
            url += '%s%s' % ('&' if i else '?', key)
131
            if val:
132
                url += '=%s' % val
133
        parsed = urlparse(url)
134
        self.url = url
135
        self.path = parsed.path or '/'
136
        if parsed.query:
137
            self.path += '?%s' % parsed.query
138
        return (parsed.scheme, parsed.netloc)
139

    
140
    def __init__(
141
            self, method, url, path,
142
            data=None, headers={}, params={}):
143
        method = method.upper()
144
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
145
        if headers:
146
            assert isinstance(headers, dict)
147
            self.headers = dict(headers)
148
        self.method, self.data = method, data
149
        self.scheme, self.netloc = self._connection_info(url, path, params)
150

    
151
    def perform(self, conn):
152
        """
153
        :param conn: (httplib connection object)
154

155
        :returns: (HTTPResponse)
156
        """
157
        #  sendlog.debug(
158
        #    'RequestManager.perform mthd(%s), url(%s), headrs(%s), bdlen(%s)',
159
        #    self.method, self.url, self.headers, self.data)
160
        conn.request(
161
            method=str(self.method.upper()),
162
            url=str(self.path),
163
            headers=self.headers,
164
            body=self.data)
165
        while True:
166
            try:
167
                return conn.getresponse()
168
            except ResponseNotReady:
169
                sleep(0.03 * random())
170

    
171

    
172
class ResponseManager(object):
173
    """Manage the http request and handle the response data, headers, etc."""
174

    
175
    def __init__(self, request, poolsize=None):
176
        """
177
        :param request: (RequestManager)
178
        """
179
        self.request = request
180
        self._request_performed = False
181
        self.poolsize = poolsize
182

    
183
    def _get_response(self):
184
        if self._request_performed:
185
            return
186

    
187
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
188
        try:
189
            with PooledHTTPConnection(
190
                    self.request.netloc, self.request.scheme,
191
                    **pool_kw) as connection:
192
                r = self.request.perform(connection)
193
                #  recvlog.debug('ResponseManager(%s):' % r)
194
                self._request_performed = True
195
                self._headers = dict()
196
                for k, v in r.getheaders():
197
                    self.headers[k] = v
198
                    #  recvlog.debug('\t%s: %s\t(%s)' % (k, v, r))
199
                self._content = r.read()
200
                self._status_code = r.status
201
                self._status = r.reason
202
        except Exception as err:
203
            from kamaki.clients import recvlog
204
            from traceback import format_stack
205
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
206
            raise ClientError(
207
                'Failed while http-connecting to %s (%s)' % (
208
                    self.request.url,
209
                    err),
210
                1000)
211

    
212
    @property
213
    def status_code(self):
214
        self._get_response()
215
        return self._status_code
216

    
217
    @property
218
    def status(self):
219
        self._get_response()
220
        return self._status
221

    
222
    @property
223
    def headers(self):
224
        self._get_response()
225
        return self._headers
226

    
227
    @property
228
    def content(self):
229
        self._get_response()
230
        return self._content
231

    
232
    @property
233
    def text(self):
234
        """
235
        :returns: (str) content
236
        """
237
        self._get_response()
238
        return '%s' % self._content
239

    
240
    @property
241
    def json(self):
242
        """
243
        :returns: (dict) squeezed from json-formated content
244
        """
245
        self._get_response()
246
        try:
247
            return loads(self._content)
248
        except ValueError as err:
249
            ClientError('Response not formated in JSON - %s' % err)
250

    
251

    
252
class SilentEvent(Thread):
253
    """ Thread-run method(*args, **kwargs)"""
254
    def __init__(self, method, *args, **kwargs):
255
        super(self.__class__, self).__init__()
256
        self.method = method
257
        self.args = args
258
        self.kwargs = kwargs
259

    
260
    @property
261
    def exception(self):
262
        return getattr(self, '_exception', False)
263

    
264
    @property
265
    def value(self):
266
        return getattr(self, '_value', None)
267

    
268
    def run(self):
269
        try:
270
            self._value = self.method(*(self.args), **(self.kwargs))
271
        except Exception as e:
272
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
273
                self,
274
                type(e),
275
                e.status if isinstance(e, ClientError) else '',
276
                e))
277
            self._exception = e
278

    
279

    
280
class Client(object):
281

    
282
    def __init__(self, base_url, token):
283
        self.base_url = base_url
284
        self.token = token
285
        self.headers, self.params = dict(), dict()
286
        self.DATE_FORMATS = [
287
            '%a %b %d %H:%M:%S %Y',
288
            '%A, %d-%b-%y %H:%M:%S GMT',
289
            '%a, %d %b %Y %H:%M:%S GMT']
290
        self.MAX_THREADS = 7
291

    
292
    def _init_thread_limit(self, limit=1):
293
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
294
        self._thread_limit = limit
295
        self._elapsed_old = 0.0
296
        self._elapsed_new = 0.0
297

    
298
    def _watch_thread_limit(self, threadlist):
299
        self._thread_limit = getattr(self, '_thread_limit', 1)
300
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
301
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
302
        recvlog.debug('# running threads: %s' % len(threadlist))
303
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
304
                self._thread_limit < self.MAX_THREADS):
305
            self._thread_limit += 1
306
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
307
            self._thread_limit -= 1
308

    
309
        self._elapsed_old = self._elapsed_new
310
        if len(threadlist) >= self._thread_limit:
311
            self._elapsed_new = 0.0
312
            for thread in threadlist:
313
                begin_time = time()
314
                thread.join()
315
                self._elapsed_new += time() - begin_time
316
            self._elapsed_new = self._elapsed_new / len(threadlist)
317
            return []
318
        return threadlist
319

    
320
    def _raise_for_status(self, r):
321
        clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
322
        status_msg = getattr(r, 'status', None) or ''
323
        try:
324
            message = '%s %s\n' % (status_msg, r.text)
325
        except:
326
            message = '%s %s\n' % (status_msg, r)
327
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
328
        raise ClientError(message, status=status)
329

    
330
    def set_header(self, name, value, iff=True):
331
        """Set a header 'name':'value'"""
332
        if value is not None and iff:
333
            self.headers[name] = value
334

    
335
    def set_param(self, name, value=None, iff=True):
336
        if iff:
337
            self.params[name] = value
338

    
339
    def request(
340
            self, method, path,
341
            async_headers=dict(), async_params=dict(),
342
            **kwargs):
343
        """In threaded/asynchronous requests, headers and params are not safe
344
        Therefore, the standard self.set_header/param system can be used only
345
        for headers and params that are common for all requests. All other
346
        params and headers should passes as
347
        @param async_headers
348
        @async_params
349
        E.g. in most queries the 'X-Auth-Token' header might be the same for
350
        all, but the 'Range' header might be different from request to request.
351
        """
352
        assert isinstance(method, str) or isinstance(method, unicode)
353
        assert method
354
        assert isinstance(path, str) or isinstance(path, unicode)
355
        try:
356
            headers = dict(self.headers)
357
            headers.update(async_headers)
358
            params = dict(self.params)
359
            params.update(async_params)
360
            success = kwargs.pop('success', 200)
361
            data = kwargs.pop('data', None)
362
            headers.setdefault('X-Auth-Token', self.token)
363
            if 'json' in kwargs:
364
                data = dumps(kwargs.pop('json'))
365
                headers.setdefault('Content-Type', 'application/json')
366
            if data:
367
                headers.setdefault('Content-Length', '%s' % len(data))
368

    
369
            req = RequestManager(
370
                method, self.base_url, path,
371
                data=data, headers=headers, params=params)
372
            sendlog.info('commit a %s @ %s\t[%s]', method, self.base_url, self)
373
            sendlog.info('\tpath: %s\t[%s]', req.path, self)
374
            for key, val in req.headers.items():
375
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
376
                    continue
377
                sendlog.info('\t%s: %s [%s]', key, val, self)
378
            if data:
379
                datasendlog.info(data)
380
            sendlog.info('END HTTP request commit\t[%s]', self)
381

    
382
            r = ResponseManager(req)
383
            recvlog.info('%d %s', r.status_code, r.status)
384
            for key, val in r.headers.items():
385
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
386
                    continue
387
                recvlog.info('%s: %s', key, val)
388
            if r.content:
389
                datarecvlog.info(r.content)
390
        finally:
391
            self.headers = dict()
392
            self.params = dict()
393

    
394
        if success is not None:
395
            # Success can either be an int or a collection
396
            success = (success,) if isinstance(success, int) else success
397
            if r.status_code not in success:
398
                self._raise_for_status(r)
399
        return r
400

    
401
    def delete(self, path, **kwargs):
402
        return self.request('delete', path, **kwargs)
403

    
404
    def get(self, path, **kwargs):
405
        return self.request('get', path, **kwargs)
406

    
407
    def head(self, path, **kwargs):
408
        return self.request('head', path, **kwargs)
409

    
410
    def post(self, path, **kwargs):
411
        return self.request('post', path, **kwargs)
412

    
413
    def put(self, path, **kwargs):
414
        return self.request('put', path, **kwargs)
415

    
416
    def copy(self, path, **kwargs):
417
        return self.request('copy', path, **kwargs)
418

    
419
    def move(self, path, **kwargs):
420
        return self.request('move', path, **kwargs)