Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 0d3785a1

History | View | Annotate | Download (20.3 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = quote('' if val in (None, False) else _encode('%s' % val))
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = url
131
        self.path = parsed.path or '/'
132
        if parsed.query:
133
            self.path += '?%s' % parsed.query
134
        return (parsed.scheme, parsed.netloc)
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size: 0%s' % plog)
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        self.dump_log()
171
        conn.request(
172
            method=str(self.method.upper()),
173
            url=str(self.path),
174
            headers=self.headers,
175
            body=self.data)
176
        sendlog.info('')
177
        keep_trying = TIMEOUT
178
        while keep_trying > 0:
179
            try:
180
                return conn.getresponse()
181
            except ResponseNotReady:
182
                wait = 0.03 * random()
183
                sleep(wait)
184
                keep_trying -= wait
185
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
186
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187
        recvlog.debug(logmsg)
188
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189

    
190

    
191
class ResponseManager(Logged):
192
    """Manage the http request and handle the response data, headers, etc."""
193

    
194
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195
        """
196
        :param request: (RequestManager)
197

198
        :param poolsize: (int) the size of the connection pool
199

200
        :param connection_retry_limit: (int)
201
        """
202
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203
        self.request = request
204
        self._request_performed = False
205
        self.poolsize = poolsize
206

    
207
    def _get_response(self):
208
        if self._request_performed:
209
            return
210

    
211
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213
            try:
214
                with PooledHTTPConnection(
215
                        self.request.netloc, self.request.scheme,
216
                        **pool_kw) as connection:
217
                    self.request.LOG_TOKEN = self.LOG_TOKEN
218
                    self.request.LOG_DATA = self.LOG_DATA
219
                    self.request.LOG_PID = self.LOG_PID
220
                    r = self.request.perform(connection)
221
                    plog = ''
222
                    if self.LOG_PID:
223
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224
                            self, r, self.request))
225
                        plog = '\t[%s]' % self
226
                    self._request_performed = True
227
                    self._status_code, self._status = r.status, unquote(
228
                        r.reason)
229
                    recvlog.info(
230
                        '%d %s%s' % (
231
                            self.status_code, self.status, plog))
232
                    self._headers = dict()
233
                    for k, v in r.getheaders():
234
                        if k.lower in ('x-auth-token', ) and (
235
                                not self.LOG_TOKEN):
236
                            self._token, v = v, '...'
237
                        v = unquote(v)
238
                        self._headers[k] = v
239
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240
                    self._content = r.read()
241
                    recvlog.info('data size: %s%s' % (
242
                        len(self._content) if self._content else 0, plog))
243
                    if self.LOG_DATA and self._content:
244
                        data = '%s%s' % (self._content, plog)
245
                        if self._token:
246
                            data = data.replace(self._token, '...')
247
                        recvlog.info(data)
248
                    recvlog.info('-             -        -     -   -  - -')
249
                break
250
            except Exception as err:
251
                if isinstance(err, HTTPException):
252
                    if retries >= self.CONNECTION_TRY_LIMIT:
253
                        raise ClientError(
254
                            'Connection to %s failed %s times (%s: %s )' % (
255
                                self.request.url, retries, type(err), err))
256
                else:
257
                    from traceback import format_stack
258
                    recvlog.debug(
259
                        '\n'.join(['%s' % type(err)] + format_stack()))
260
                    raise ClientError(
261
                        'Failed while http-connecting to %s (%s)' % (
262
                            self.request.url, err))
263

    
264
    @property
265
    def status_code(self):
266
        self._get_response()
267
        return self._status_code
268

    
269
    @property
270
    def status(self):
271
        self._get_response()
272
        return self._status
273

    
274
    @property
275
    def headers(self):
276
        self._get_response()
277
        return self._headers
278

    
279
    @property
280
    def content(self):
281
        self._get_response()
282
        return self._content
283

    
284
    @property
285
    def text(self):
286
        """
287
        :returns: (str) content
288
        """
289
        self._get_response()
290
        return '%s' % self._content
291

    
292
    @property
293
    def json(self):
294
        """
295
        :returns: (dict) squeezed from json-formated content
296
        """
297
        self._get_response()
298
        try:
299
            return loads(self._content)
300
        except ValueError as err:
301
            raise ClientError('Response not formated in JSON - %s' % err)
302

    
303

    
304
class SilentEvent(Thread):
305
    """Thread-run method(*args, **kwargs)"""
306
    def __init__(self, method, *args, **kwargs):
307
        super(self.__class__, self).__init__()
308
        self.method = method
309
        self.args = args
310
        self.kwargs = kwargs
311

    
312
    @property
313
    def exception(self):
314
        return getattr(self, '_exception', False)
315

    
316
    @property
317
    def value(self):
318
        return getattr(self, '_value', None)
319

    
320
    def run(self):
321
        try:
322
            self._value = self.method(*(self.args), **(self.kwargs))
323
        except Exception as e:
324
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325
                self,
326
                type(e),
327
                e.status if isinstance(e, ClientError) else '',
328
                e))
329
            self._exception = e
330

    
331

    
332
class Client(Logged):
333

    
334
    MAX_THREADS = 1
335
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336
    CONNECTION_RETRY_LIMIT = 0
337

    
338
    def __init__(self, base_url, token):
339
        assert base_url, 'No base_url for client %s' % self
340
        self.base_url = base_url
341
        self.token = token
342
        self.headers, self.params = dict(), dict()
343
        self.poolsize = None
344

    
345
    def _init_thread_limit(self, limit=1):
346
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
347
        self._thread_limit = limit
348
        self._elapsed_old = 0.0
349
        self._elapsed_new = 0.0
350

    
351
    def _watch_thread_limit(self, threadlist):
352
        self._thread_limit = getattr(self, '_thread_limit', 1)
353
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
354
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
355
        recvlog.debug('# running threads: %s' % len(threadlist))
356
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
357
                self._thread_limit < self.MAX_THREADS):
358
            self._thread_limit += 1
359
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
360
            self._thread_limit -= 1
361

    
362
        self._elapsed_old = self._elapsed_new
363
        if len(threadlist) >= self._thread_limit:
364
            self._elapsed_new = 0.0
365
            for thread in threadlist:
366
                begin_time = time()
367
                thread.join()
368
                self._elapsed_new += time() - begin_time
369
            self._elapsed_new = self._elapsed_new / len(threadlist)
370
            return []
371
        return threadlist
372

    
373
    def async_run(self, method, kwarg_list):
374
        """Fire threads of operations
375

376
        :param method: the method to run in each thread
377

378
        :param kwarg_list: (list of dicts) the arguments to pass in each method
379
            call
380

381
        :returns: (list) the results of each method call w.r. to the order of
382
            kwarg_list
383
        """
384
        flying, results = {}, {}
385
        self._init_thread_limit()
386
        for index, kwargs in enumerate(kwarg_list):
387
            self._watch_thread_limit(flying.values())
388
            flying[index] = SilentEvent(method=method, **kwargs)
389
            flying[index].start()
390
            unfinished = {}
391
            for key, thread in flying.items():
392
                if thread.isAlive():
393
                    unfinished[key] = thread
394
                elif thread.exception:
395
                    raise thread.exception
396
                else:
397
                    results[key] = thread.value
398
            flying = unfinished
399
        sendlog.info('- - - wait for threads to finish')
400
        for key, thread in flying.items():
401
            if thread.isAlive():
402
                thread.join()
403
            if thread.exception:
404
                raise thread.exception
405
            results[key] = thread.value
406
        return results.values()
407

    
408
    def _raise_for_status(self, r):
409
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
410
        status_msg = getattr(r, 'status', None) or ''
411
        try:
412
            message = '%s %s\n' % (status_msg, r.text)
413
        except:
414
            message = '%s %s\n' % (status_msg, r)
415
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
416
        raise ClientError(message, status=status)
417

    
418
    def set_header(self, name, value, iff=True):
419
        """Set a header 'name':'value'"""
420
        if value is not None and iff:
421
            self.headers[name] = unicode(value)
422

    
423
    def set_param(self, name, value=None, iff=True):
424
        if iff:
425
            self.params[name] = '%s' % value  # unicode(value)
426

    
427
    def request(
428
            self, method, path,
429
            async_headers=dict(), async_params=dict(),
430
            **kwargs):
431
        """Commit an HTTP request to base_url/path
432
        Requests are commited to and performed by Request/ResponseManager
433
        These classes perform a lazy http request. Present method, by default,
434
        enforces them to perform the http call. Hint: call present method with
435
        success=None to get a non-performed ResponseManager object.
436
        """
437
        assert isinstance(method, str) or isinstance(method, unicode)
438
        assert method
439
        assert isinstance(path, str) or isinstance(path, unicode)
440
        try:
441
            headers = dict(self.headers)
442
            headers.update(async_headers)
443
            params = dict(self.params)
444
            params.update(async_params)
445
            success = kwargs.pop('success', 200)
446
            data = kwargs.pop('data', None)
447
            headers.setdefault('X-Auth-Token', self.token)
448
            if 'json' in kwargs:
449
                data = dumps(kwargs.pop('json'))
450
                headers.setdefault('Content-Type', 'application/json')
451
            if data:
452
                headers.setdefault('Content-Length', '%s' % len(data))
453

    
454
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
455
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
456
            req = RequestManager(
457
                method, self.base_url, path,
458
                data=data, headers=headers, params=params)
459
            #  req.log()
460
            r = ResponseManager(
461
                req,
462
                poolsize=self.poolsize,
463
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
464
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
465
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
466
            r._token = headers['X-Auth-Token']
467
        finally:
468
            self.headers = dict()
469
            self.params = dict()
470

    
471
        if success is not None:
472
            # Success can either be an int or a collection
473
            success = (success,) if isinstance(success, int) else success
474
            if r.status_code not in success:
475
                self._raise_for_status(r)
476
        return r
477

    
478
    def delete(self, path, **kwargs):
479
        return self.request('delete', path, **kwargs)
480

    
481
    def get(self, path, **kwargs):
482
        return self.request('get', path, **kwargs)
483

    
484
    def head(self, path, **kwargs):
485
        return self.request('head', path, **kwargs)
486

    
487
    def post(self, path, **kwargs):
488
        return self.request('post', path, **kwargs)
489

    
490
    def put(self, path, **kwargs):
491
        return self.request('put', path, **kwargs)
492

    
493
    def copy(self, path, **kwargs):
494
        return self.request('copy', path, **kwargs)
495

    
496
    def move(self, path, **kwargs):
497
        return self.request('move', path, **kwargs)
498

    
499

    
500
class Waiter(object):
501

    
502
    def _wait(
503
            self, item_id, wait_status, get_status,
504
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
505
        """Wait while the item is still in wait_status or to reach it
506

507
        :param server_id: integer (str or int)
508

509
        :param wait_status: (str)
510

511
        :param get_status: (method(self, item_id)) if called, returns
512
            (status, progress %) If no way to tell progress, return None
513

514
        :param delay: time interval between retries
515

516
        :param wait_cb: (method(total steps)) returns a generator for
517
            reporting progress or timeouts i.e., for a progress bar
518

519
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
520

521
        :returns: (str) the new mode if successful, (bool) False if timed out
522
        """
523
        status, progress = get_status(self, item_id)
524

    
525
        if wait_cb:
526
            wait_gen = wait_cb(max_wait // delay)
527
            wait_gen.next()
528

    
529
        if wait_for_status ^ (status != wait_status):
530
            # if wait_cb:
531
            #     try:
532
            #         wait_gen.next()
533
            #     except Exception:
534
            #         pass
535
            return status
536
        old_wait = total_wait = 0
537

    
538
        while (wait_for_status ^ (status == wait_status)) and (
539
                total_wait <= max_wait):
540
            if wait_cb:
541
                try:
542
                    for i in range(total_wait - old_wait):
543
                        wait_gen.next()
544
                except Exception:
545
                    break
546
            old_wait = total_wait
547
            total_wait = progress or total_wait + 1
548
            sleep(delay)
549
            status, progress = get_status(self, item_id)
550

    
551
        if total_wait < max_wait:
552
            if wait_cb:
553
                try:
554
                    for i in range(max_wait):
555
                        wait_gen.next()
556
                except:
557
                    pass
558
        return status if (wait_for_status ^ (status != wait_status)) else False
559

    
560
    def wait_for(
561
            self, item_id, target_status, get_status,
562
            delay=1, max_wait=100, wait_cb=None):
563
        self._wait(
564
            item_id, target_status, get_status, delay, max_wait, wait_cb,
565
            wait_for_status=True)
566

    
567
    def wait_while(
568
            self, item_id, target_status, get_status,
569
            delay=1, max_wait=100, wait_cb=None):
570
        self._wait(
571
            item_id, target_status, get_status, delay, max_wait, wait_cb,
572
            wait_for_status=False)