Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ a6a44506

History | View | Annotate | Download (20.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = '' if val in (None, False) else _encode(u'%s' % val)
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = url
131
        self.path = parsed.path or '/'
132
        if parsed.query:
133
            self.path += '?%s' % parsed.query
134
        return (parsed.scheme, parsed.netloc)
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size:0%s' % plog)
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        self.dump_log()
171
        conn.request(
172
            method=str(self.method.upper()),
173
            url=str(self.path),
174
            headers=self.headers,
175
            body=self.data)
176
        sendlog.info('')
177
        keep_trying = TIMEOUT
178
        while keep_trying > 0:
179
            try:
180
                return conn.getresponse()
181
            except ResponseNotReady:
182
                wait = 0.03 * random()
183
                sleep(wait)
184
                keep_trying -= wait
185
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
186
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187
        recvlog.debug(logmsg)
188
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189

    
190

    
191
class ResponseManager(Logged):
192
    """Manage the http request and handle the response data, headers, etc."""
193

    
194
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195
        """
196
        :param request: (RequestManager)
197

198
        :param poolsize: (int) the size of the connection pool
199

200
        :param connection_retry_limit: (int)
201
        """
202
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203
        self.request = request
204
        self._request_performed = False
205
        self.poolsize = poolsize
206

    
207
    def _get_response(self):
208
        if self._request_performed:
209
            return
210

    
211
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213
            try:
214
                with PooledHTTPConnection(
215
                        self.request.netloc, self.request.scheme,
216
                        **pool_kw) as connection:
217
                    self.request.LOG_TOKEN = self.LOG_TOKEN
218
                    self.request.LOG_DATA = self.LOG_DATA
219
                    self.request.LOG_PID = self.LOG_PID
220
                    r = self.request.perform(connection)
221
                    plog = ''
222
                    if self.LOG_PID:
223
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224
                            self, r, self.request))
225
                        plog = '\t[%s]' % self
226
                    self._request_performed = True
227
                    self._status_code, self._status = r.status, unquote(
228
                        r.reason)
229
                    recvlog.info(
230
                        '%d %s%s' % (
231
                            self.status_code, self.status, plog))
232
                    self._headers = dict()
233
                    for k, v in r.getheaders():
234
                        if k.lower in ('x-auth-token', ) and (
235
                                not self.LOG_TOKEN):
236
                            self._token, v = v, '...'
237
                        v = unquote(v)
238
                        self._headers[k] = v
239
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240
                    self._content = r.read()
241
                    recvlog.info('data size: %s%s' % (
242
                        len(self._content) if self._content else 0, plog))
243
                    if self.LOG_DATA and self._content:
244
                        data = '%s%s' % (self._content, plog)
245
                        if self._token:
246
                            data = data.replace(self._token, '...')
247
                        recvlog.info(data)
248
                    recvlog.info('-             -        -     -   -  - -')
249
                break
250
            except Exception as err:
251
                if isinstance(err, HTTPException):
252
                    if retries >= self.CONNECTION_TRY_LIMIT:
253
                        raise ClientError(
254
                            'Connection to %s failed %s times (%s: %s )' % (
255
                                self.request.url, retries, type(err), err))
256
                else:
257
                    from traceback import format_stack
258
                    recvlog.debug(
259
                        '\n'.join(['%s' % type(err)] + format_stack()))
260
                    raise ClientError(
261
                        'Failed while http-connecting to %s (%s)' % (
262
                            self.request.url, err))
263

    
264
    @property
265
    def status_code(self):
266
        self._get_response()
267
        return self._status_code
268

    
269
    @property
270
    def status(self):
271
        self._get_response()
272
        return self._status
273

    
274
    @property
275
    def headers(self):
276
        self._get_response()
277
        return self._headers
278

    
279
    @property
280
    def content(self):
281
        self._get_response()
282
        return self._content
283

    
284
    @property
285
    def text(self):
286
        """
287
        :returns: (str) content
288
        """
289
        self._get_response()
290
        return '%s' % self._content
291

    
292
    @property
293
    def json(self):
294
        """
295
        :returns: (dict) squeezed from json-formated content
296
        """
297
        self._get_response()
298
        try:
299
            return loads(self._content)
300
        except ValueError as err:
301
            raise ClientError('Response not formated in JSON - %s' % err)
302

    
303

    
304
class SilentEvent(Thread):
305
    """Thread-run method(*args, **kwargs)"""
306
    def __init__(self, method, *args, **kwargs):
307
        super(self.__class__, self).__init__()
308
        self.method = method
309
        self.args = args
310
        self.kwargs = kwargs
311

    
312
    @property
313
    def exception(self):
314
        return getattr(self, '_exception', False)
315

    
316
    @property
317
    def value(self):
318
        return getattr(self, '_value', None)
319

    
320
    def run(self):
321
        try:
322
            self._value = self.method(*(self.args), **(self.kwargs))
323
        except Exception as e:
324
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325
                self,
326
                type(e),
327
                e.status if isinstance(e, ClientError) else '',
328
                e))
329
            self._exception = e
330

    
331

    
332
class Client(Logged):
333

    
334
    MAX_THREADS = 1
335
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336
    CONNECTION_RETRY_LIMIT = 0
337

    
338
    def __init__(self, base_url, token):
339
        assert base_url, 'No base_url for client %s' % self
340
        self.base_url = base_url
341
        self.token = token
342
        self.headers, self.params = dict(), dict()
343

    
344
    def _init_thread_limit(self, limit=1):
345
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346
        self._thread_limit = limit
347
        self._elapsed_old = 0.0
348
        self._elapsed_new = 0.0
349

    
350
    def _watch_thread_limit(self, threadlist):
351
        self._thread_limit = getattr(self, '_thread_limit', 1)
352
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
353
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354
        recvlog.debug('# running threads: %s' % len(threadlist))
355
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356
                self._thread_limit < self.MAX_THREADS):
357
            self._thread_limit += 1
358
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359
            self._thread_limit -= 1
360

    
361
        self._elapsed_old = self._elapsed_new
362
        if len(threadlist) >= self._thread_limit:
363
            self._elapsed_new = 0.0
364
            for thread in threadlist:
365
                begin_time = time()
366
                thread.join()
367
                self._elapsed_new += time() - begin_time
368
            self._elapsed_new = self._elapsed_new / len(threadlist)
369
            return []
370
        return threadlist
371

    
372
    def async_run(self, method, kwarg_list):
373
        """Fire threads of operations
374

375
        :param method: the method to run in each thread
376

377
        :param kwarg_list: (list of dicts) the arguments to pass in each method
378
            call
379

380
        :returns: (list) the results of each method call w.r. to the order of
381
            kwarg_list
382
        """
383
        flying, results = {}, {}
384
        self._init_thread_limit()
385
        for index, kwargs in enumerate(kwarg_list):
386
            self._watch_thread_limit(flying.values())
387
            flying[index] = SilentEvent(method=method, **kwargs)
388
            flying[index].start()
389
            unfinished = {}
390
            for key, thread in flying.items():
391
                if thread.isAlive():
392
                    unfinished[key] = thread
393
                elif thread.exception:
394
                    raise thread.exception
395
                else:
396
                    results[key] = thread.value
397
            flying = unfinished
398
        sendlog.info('- - - wait for threads to finish')
399
        for key, thread in flying.items():
400
            if thread.isAlive():
401
                thread.join()
402
            if thread.exception:
403
                raise thread.exception
404
            results[key] = thread.value
405
        return results.values()
406

    
407
    def _raise_for_status(self, r):
408
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
409
        status_msg = getattr(r, 'status', None) or ''
410
        try:
411
            message = '%s %s\n' % (status_msg, r.text)
412
        except:
413
            message = '%s %s\n' % (status_msg, r)
414
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
415
        raise ClientError(message, status=status)
416

    
417
    def set_header(self, name, value, iff=True):
418
        """Set a header 'name':'value'"""
419
        if value is not None and iff:
420
            self.headers[name] = unicode(value)
421

    
422
    def set_param(self, name, value=None, iff=True):
423
        if iff:
424
            self.params[name] = unicode(value)
425

    
426
    def request(
427
            self, method, path,
428
            async_headers=dict(), async_params=dict(),
429
            **kwargs):
430
        """Commit an HTTP request to base_url/path
431
        Requests are commited to and performed by Request/ResponseManager
432
        These classes perform a lazy http request. Present method, by default,
433
        enforces them to perform the http call. Hint: call present method with
434
        success=None to get a non-performed ResponseManager object.
435
        """
436
        assert isinstance(method, str) or isinstance(method, unicode)
437
        assert method
438
        assert isinstance(path, str) or isinstance(path, unicode)
439
        try:
440
            headers = dict(self.headers)
441
            headers.update(async_headers)
442
            params = dict(self.params)
443
            params.update(async_params)
444
            success = kwargs.pop('success', 200)
445
            data = kwargs.pop('data', None)
446
            headers.setdefault('X-Auth-Token', self.token)
447
            if 'json' in kwargs:
448
                data = dumps(kwargs.pop('json'))
449
                headers.setdefault('Content-Type', 'application/json')
450
            if data:
451
                headers.setdefault('Content-Length', '%s' % len(data))
452

    
453
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
454
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
455
            req = RequestManager(
456
                method, self.base_url, path,
457
                data=data, headers=headers, params=params)
458
            #  req.log()
459
            r = ResponseManager(
460
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
461
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
462
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
463
            r._token = headers['X-Auth-Token']
464
        finally:
465
            self.headers = dict()
466
            self.params = dict()
467

    
468
        if success is not None:
469
            # Success can either be an int or a collection
470
            success = (success,) if isinstance(success, int) else success
471
            if r.status_code not in success:
472
                self._raise_for_status(r)
473
        return r
474

    
475
    def delete(self, path, **kwargs):
476
        return self.request('delete', path, **kwargs)
477

    
478
    def get(self, path, **kwargs):
479
        return self.request('get', path, **kwargs)
480

    
481
    def head(self, path, **kwargs):
482
        return self.request('head', path, **kwargs)
483

    
484
    def post(self, path, **kwargs):
485
        return self.request('post', path, **kwargs)
486

    
487
    def put(self, path, **kwargs):
488
        return self.request('put', path, **kwargs)
489

    
490
    def copy(self, path, **kwargs):
491
        return self.request('copy', path, **kwargs)
492

    
493
    def move(self, path, **kwargs):
494
        return self.request('move', path, **kwargs)
495

    
496

    
497
class Waiter(object):
498

    
499
    def _wait(
500
            self, item_id, wait_status, get_status,
501
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
502
        """Wait while the item is still in wait_status or to reach it
503

504
        :param server_id: integer (str or int)
505

506
        :param wait_status: (str)
507

508
        :param get_status: (method(self, item_id)) if called, returns
509
            (status, progress %) If no way to tell progress, return None
510

511
        :param delay: time interval between retries
512

513
        :param wait_cb: (method(total steps)) returns a generator for
514
            reporting progress or timeouts i.e., for a progress bar
515

516
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
517

518
        :returns: (str) the new mode if successful, (bool) False if timed out
519
        """
520
        status, progress = get_status(self, item_id)
521

    
522
        if wait_cb:
523
            wait_gen = wait_cb(max_wait // delay)
524
            wait_gen.next()
525

    
526
        if wait_for_status ^ (status != wait_status):
527
            # if wait_cb:
528
            #     try:
529
            #         wait_gen.next()
530
            #     except Exception:
531
            #         pass
532
            return status
533
        old_wait = total_wait = 0
534

    
535
        while (wait_for_status ^ (status == wait_status)) and (
536
                total_wait <= max_wait):
537
            if wait_cb:
538
                try:
539
                    for i in range(total_wait - old_wait):
540
                        wait_gen.next()
541
                except Exception:
542
                    break
543
            old_wait = total_wait
544
            total_wait = progress or total_wait + 1
545
            sleep(delay)
546
            status, progress = get_status(self, item_id)
547

    
548
        if total_wait < max_wait:
549
            if wait_cb:
550
                try:
551
                    for i in range(max_wait):
552
                        wait_gen.next()
553
                except:
554
                    pass
555
        return status if (wait_for_status ^ (status != wait_status)) else False
556

    
557
    def wait_for(
558
            self, item_id, target_status, get_status,
559
            delay=1, max_wait=100, wait_cb=None):
560
        self._wait(
561
            item_id, target_status, get_status, delay, max_wait, wait_cb,
562
            wait_for_status=True)
563

    
564
    def wait_while(
565
            self, item_id, target_status, get_status,
566
            delay=1, max_wait=100, wait_cb=None):
567
        self._wait(
568
            item_id, target_status, get_status, delay, max_wait, wait_cb,
569
            wait_for_status=False)