Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 6293aa78

History | View | Annotate | Download (21.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = url or 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = quote('' if val in (None, False) else '%s' % val)
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = _encode(u'%s' % url)
131
        self.path = _encode((u'%s' % parsed.path) if parsed.path else '/')
132
        if parsed.query:
133
            self.path += _encode(u'?%s' % parsed.query)
134
        return (_encode(parsed.scheme), _encode(parsed.netloc))
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size: 0%s' % plog)
163

    
164
    def _encode_headers(self):
165
        headers = self.headers
166
        for k, v in self.headers.items():
167
            headers[k] = quote(v)
168
        self.headers = headers
169

    
170
    def perform(self, conn):
171
        """
172
        :param conn: (httplib connection object)
173

174
        :returns: (HTTPResponse)
175
        """
176
        self._encode_headers()
177
        self.dump_log()
178
        conn.request(
179
            method=self.method.upper(),
180
            url=('%s' % self.path) or '',
181
            headers=self.headers,
182
            body=self.data)
183
        sendlog.info('')
184
        keep_trying = TIMEOUT
185
        while keep_trying > 0:
186
            try:
187
                return conn.getresponse()
188
            except ResponseNotReady:
189
                wait = 0.03 * random()
190
                sleep(wait)
191
                keep_trying -= wait
192
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
193
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194
        recvlog.debug(logmsg)
195
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
196

    
197

    
198
class ResponseManager(Logged):
199
    """Manage the http request and handle the response data, headers, etc."""
200

    
201
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
202
        """
203
        :param request: (RequestManager)
204

205
        :param poolsize: (int) the size of the connection pool
206

207
        :param connection_retry_limit: (int)
208
        """
209
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210
        self.request = request
211
        self._request_performed = False
212
        self.poolsize = poolsize
213
        self._headers_to_decode, self._header_prefices = [], []
214

    
215
    def _get_headers_to_decode(self, headers):
216
        keys = set([k.lower() for k, v in headers])
217
        encodable = list(keys.intersection(self.headers_to_decode))
218

    
219
        def has_prefix(s):
220
            for k in self.header_prefices:
221
                if s.startswith(k):
222
                    return True
223
            return False
224
        return encodable + filter(has_prefix, keys.difference(encodable))
225

    
226
    def _get_response(self):
227
        if self._request_performed:
228
            return
229

    
230
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
231
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
232
            try:
233
                with PooledHTTPConnection(
234
                        self.request.netloc, self.request.scheme,
235
                        **pool_kw) as connection:
236
                    self.request.LOG_TOKEN = self.LOG_TOKEN
237
                    self.request.LOG_DATA = self.LOG_DATA
238
                    self.request.LOG_PID = self.LOG_PID
239
                    r = self.request.perform(connection)
240
                    plog = ''
241
                    if self.LOG_PID:
242
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
243
                            self, r, self.request))
244
                        plog = '\t[%s]' % self
245
                    self._request_performed = True
246
                    self._status_code, self._status = r.status, unquote(
247
                        r.reason)
248
                    recvlog.info(
249
                        '%d %s%s' % (
250
                            self.status_code, self.status, plog))
251
                    self._headers = dict()
252

    
253
                    r_headers = r.getheaders()
254
                    enc_headers = self._get_headers_to_decode(r_headers)
255
                    for k, v in r_headers:
256
                        if k.lower in ('x-auth-token', ) and (
257
                                not self.LOG_TOKEN):
258
                            self._token, v = v, '...'
259
                        elif k.lower() in enc_headers:
260
                            v = unquote(v).decode('utf-8')
261
                        self._headers[k] = v
262
                        recvlog.info('  %s: %s%s' % (k, v, plog))
263
                    self._content = r.read()
264
                    recvlog.info('data size: %s%s' % (
265
                        len(self._content) if self._content else 0, plog))
266
                    if self.LOG_DATA and self._content:
267
                        data = '%s%s' % (self._content, plog)
268
                        if self._token:
269
                            data = data.replace(self._token, '...')
270
                        recvlog.info(data)
271
                    recvlog.info('-             -        -     -   -  - -')
272
                break
273
            except Exception as err:
274
                if isinstance(err, HTTPException):
275
                    if retries >= self.CONNECTION_TRY_LIMIT:
276
                        raise ClientError(
277
                            'Connection to %s failed %s times (%s: %s )' % (
278
                                self.request.url, retries, type(err), err))
279
                else:
280
                    from traceback import format_stack
281
                    recvlog.debug(
282
                        '\n'.join(['%s' % type(err)] + format_stack()))
283
                    raise
284

    
285
    @property
286
    def status_code(self):
287
        self._get_response()
288
        return self._status_code
289

    
290
    @property
291
    def status(self):
292
        self._get_response()
293
        return self._status
294

    
295
    @property
296
    def headers(self):
297
        self._get_response()
298
        return self._headers
299

    
300
    @property
301
    def content(self):
302
        self._get_response()
303
        return self._content
304

    
305
    @property
306
    def text(self):
307
        """
308
        :returns: (str) content
309
        """
310
        self._get_response()
311
        return '%s' % self._content
312

    
313
    @property
314
    def headers_to_decode(self):
315
        return self._headers_to_decode
316

    
317
    @headers_to_decode.setter
318
    def headers_to_decode(self, header_keys):
319
        self._headers_to_decode += [k.lower() for k in header_keys]
320
        self._headers_to_decode = list(set(self._headers_to_decode))
321

    
322
    @property
323
    def header_prefices(self):
324
        return self._header_prefices
325

    
326
    @header_prefices.setter
327
    def header_prefices(self, header_key_prefices):
328
        self._header_prefices += [p.lower() for p in header_key_prefices]
329
        self._header_prefices = list(set(self._header_prefices))
330

    
331
    @property
332
    def json(self):
333
        """
334
        :returns: (dict) squeezed from json-formated content
335
        """
336
        self._get_response()
337
        try:
338
            return loads(self._content)
339
        except ValueError as err:
340
            raise ClientError('Response not formated in JSON - %s' % err)
341

    
342

    
343
class SilentEvent(Thread):
344
    """Thread-run method(*args, **kwargs)"""
345
    def __init__(self, method, *args, **kwargs):
346
        super(self.__class__, self).__init__()
347
        self.method = method
348
        self.args = args
349
        self.kwargs = kwargs
350

    
351
    @property
352
    def exception(self):
353
        return getattr(self, '_exception', False)
354

    
355
    @property
356
    def value(self):
357
        return getattr(self, '_value', None)
358

    
359
    def run(self):
360
        try:
361
            self._value = self.method(*(self.args), **(self.kwargs))
362
        except Exception as e:
363
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
364
                self,
365
                type(e),
366
                e.status if isinstance(e, ClientError) else '',
367
                e))
368
            self._exception = e
369

    
370

    
371
class Client(Logged):
372

    
373
    MAX_THREADS = 1
374
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
375
    CONNECTION_RETRY_LIMIT = 0
376

    
377
    def __init__(self, base_url, token):
378
        assert base_url, 'No base_url for client %s' % self
379
        self.base_url = base_url
380
        self.token = token
381
        self.headers, self.params = dict(), dict()
382
        self.poolsize = None
383
        self.headers_to_decode, self.header_prefices = [], []
384

    
385
    def _init_thread_limit(self, limit=1):
386
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
387
        self._thread_limit = limit
388
        self._elapsed_old = 0.0
389
        self._elapsed_new = 0.0
390

    
391
    def _watch_thread_limit(self, threadlist):
392
        self._thread_limit = getattr(self, '_thread_limit', 1)
393
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
394
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
395
        recvlog.debug('# running threads: %s' % len(threadlist))
396
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
397
                self._thread_limit < self.MAX_THREADS):
398
            self._thread_limit += 1
399
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
400
            self._thread_limit -= 1
401

    
402
        self._elapsed_old = self._elapsed_new
403
        if len(threadlist) >= self._thread_limit:
404
            self._elapsed_new = 0.0
405
            for thread in threadlist:
406
                begin_time = time()
407
                thread.join()
408
                self._elapsed_new += time() - begin_time
409
            self._elapsed_new = self._elapsed_new / len(threadlist)
410
            return []
411
        return threadlist
412

    
413
    def async_run(self, method, kwarg_list):
414
        """Fire threads of operations
415

416
        :param method: the method to run in each thread
417

418
        :param kwarg_list: (list of dicts) the arguments to pass in each method
419
            call
420

421
        :returns: (list) the results of each method call w.r. to the order of
422
            kwarg_list
423
        """
424
        flying, results = {}, {}
425
        self._init_thread_limit()
426
        for index, kwargs in enumerate(kwarg_list):
427
            self._watch_thread_limit(flying.values())
428
            flying[index] = SilentEvent(method=method, **kwargs)
429
            flying[index].start()
430
            unfinished = {}
431
            for key, thread in flying.items():
432
                if thread.isAlive():
433
                    unfinished[key] = thread
434
                elif thread.exception:
435
                    raise thread.exception
436
                else:
437
                    results[key] = thread.value
438
            flying = unfinished
439
        sendlog.info('- - - wait for threads to finish')
440
        for key, thread in flying.items():
441
            if thread.isAlive():
442
                thread.join()
443
            if thread.exception:
444
                raise thread.exception
445
            results[key] = thread.value
446
        return results.values()
447

    
448
    def _raise_for_status(self, r):
449
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
450
        status_msg = getattr(r, 'status', None) or ''
451
        try:
452
            message = '%s %s\n' % (status_msg, r.text)
453
        except:
454
            message = '%s %s\n' % (status_msg, r)
455
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
456
        raise ClientError(message, status=status)
457

    
458
    def set_header(self, name, value, iff=True):
459
        """Set a header 'name':'value'"""
460
        if value is not None and iff:
461
            self.headers['%s' % name] = '%s' % value
462

    
463
    def set_param(self, name, value=None, iff=True):
464
        if iff:
465
            self.params[name] = '%s' % value
466

    
467
    def request(
468
            self, method, path,
469
            async_headers=dict(), async_params=dict(),
470
            **kwargs):
471
        """Commit an HTTP request to base_url/path
472
        Requests are commited to and performed by Request/ResponseManager
473
        These classes perform a lazy http request. Present method, by default,
474
        enforces them to perform the http call. Hint: call present method with
475
        success=None to get a non-performed ResponseManager object.
476
        """
477
        assert isinstance(method, str) or isinstance(method, unicode)
478
        assert method
479
        assert isinstance(path, str) or isinstance(path, unicode)
480
        try:
481
            headers = dict(self.headers)
482
            headers.update(async_headers)
483
            params = dict(self.params)
484
            params.update(async_params)
485
            success = kwargs.pop('success', 200)
486
            data = kwargs.pop('data', None)
487
            headers.setdefault('X-Auth-Token', self.token)
488
            if 'json' in kwargs:
489
                data = dumps(kwargs.pop('json'))
490
                headers.setdefault('Content-Type', 'application/json')
491
            if data:
492
                headers.setdefault('Content-Length', '%s' % len(data))
493

    
494
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
495
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
496
            req = RequestManager(
497
                method, self.base_url, path,
498
                data=data, headers=headers, params=params)
499
            #  req.log()
500
            r = ResponseManager(
501
                req,
502
                poolsize=self.poolsize,
503
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
504
            r.headers_to_decode = self.headers_to_decode
505
            r.header_prefices = self.header_prefices
506
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
507
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
508
            r._token = headers['X-Auth-Token']
509
        finally:
510
            self.headers = dict()
511
            self.params = dict()
512

    
513
        if success is not None:
514
            # Success can either be an int or a collection
515
            success = (success,) if isinstance(success, int) else success
516
            if r.status_code not in success:
517
                self._raise_for_status(r)
518
        return r
519

    
520
    def delete(self, path, **kwargs):
521
        return self.request('delete', path, **kwargs)
522

    
523
    def get(self, path, **kwargs):
524
        return self.request('get', path, **kwargs)
525

    
526
    def head(self, path, **kwargs):
527
        return self.request('head', path, **kwargs)
528

    
529
    def post(self, path, **kwargs):
530
        return self.request('post', path, **kwargs)
531

    
532
    def put(self, path, **kwargs):
533
        return self.request('put', path, **kwargs)
534

    
535
    def copy(self, path, **kwargs):
536
        return self.request('copy', path, **kwargs)
537

    
538
    def move(self, path, **kwargs):
539
        return self.request('move', path, **kwargs)
540

    
541

    
542
class Waiter(object):
543

    
544
    def _wait(
545
            self, item_id, wait_status, get_status,
546
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
547
        """Wait while the item is still in wait_status or to reach it
548

549
        :param server_id: integer (str or int)
550

551
        :param wait_status: (str)
552

553
        :param get_status: (method(self, item_id)) if called, returns
554
            (status, progress %) If no way to tell progress, return None
555

556
        :param delay: time interval between retries
557

558
        :param wait_cb: (method(total steps)) returns a generator for
559
            reporting progress or timeouts i.e., for a progress bar
560

561
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
562

563
        :returns: (str) the new mode if successful, (bool) False if timed out
564
        """
565
        status, progress = get_status(self, item_id)
566

    
567
        if wait_cb:
568
            wait_gen = wait_cb(max_wait // delay)
569
            wait_gen.next()
570

    
571
        if wait_for_status ^ (status != wait_status):
572
            # if wait_cb:
573
            #     try:
574
            #         wait_gen.next()
575
            #     except Exception:
576
            #         pass
577
            return status
578
        old_wait = total_wait = 0
579

    
580
        while (wait_for_status ^ (status == wait_status)) and (
581
                total_wait <= max_wait):
582
            if wait_cb:
583
                try:
584
                    for i in range(total_wait - old_wait):
585
                        wait_gen.next()
586
                except Exception:
587
                    break
588
            old_wait = total_wait
589
            total_wait = progress or total_wait + 1
590
            sleep(delay)
591
            status, progress = get_status(self, item_id)
592

    
593
        if total_wait < max_wait:
594
            if wait_cb:
595
                try:
596
                    for i in range(max_wait):
597
                        wait_gen.next()
598
                except:
599
                    pass
600
        return status if (wait_for_status ^ (status != wait_status)) else False
601

    
602
    def wait_for(
603
            self, item_id, target_status, get_status,
604
            delay=1, max_wait=100, wait_cb=None):
605
        self._wait(
606
            item_id, target_status, get_status, delay, max_wait, wait_cb,
607
            wait_for_status=True)
608

    
609
    def wait_while(
610
            self, item_id, target_status, get_status,
611
            delay=1, max_wait=100, wait_cb=None):
612
        self._wait(
613
            item_id, target_status, get_status, delay, max_wait, wait_cb,
614
            wait_for_status=False)