Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 58602137

History | View | Annotate | Download (17.7 kB)

1 e3f01d64 Stavros Sachtouris
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 545c6c29 Stavros Sachtouris
from urllib2 import quote, unquote
35 c2b5da2f Stavros Sachtouris
from urlparse import urlparse
36 2b74ab4a Stavros Sachtouris
from threading import Thread
37 de4f08ef Stavros Sachtouris
from json import dumps, loads
38 cad39033 Stavros Sachtouris
from time import time
39 ec928235 Stavros Sachtouris
from httplib import ResponseNotReady, HTTPException
40 c2b5da2f Stavros Sachtouris
from time import sleep
41 c2b5da2f Stavros Sachtouris
from random import random
42 c4d51ec9 Stavros Sachtouris
from logging import getLogger
43 c2b5da2f Stavros Sachtouris
44 c2b5da2f Stavros Sachtouris
from objpool.http import PooledHTTPConnection
45 61ca0ecd Stavros Sachtouris
46 6a0b1658 Giorgos Verigakis
47 21871fb2 Stavros Sachtouris
TIMEOUT = 60.0   # seconds
48 21871fb2 Stavros Sachtouris
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49 2406db97 Stavros Sachtouris
50 c4d51ec9 Stavros Sachtouris
log = getLogger(__name__)
51 c4d51ec9 Stavros Sachtouris
sendlog = getLogger('%s.send' % __name__)
52 c4d51ec9 Stavros Sachtouris
recvlog = getLogger('%s.recv' % __name__)
53 e8af27f4 Stavros Sachtouris
54 c2b5da2f Stavros Sachtouris
55 c2b5da2f Stavros Sachtouris
def _encode(v):
56 c2b5da2f Stavros Sachtouris
    if v and isinstance(v, unicode):
57 c2b5da2f Stavros Sachtouris
        return quote(v.encode('utf-8'))
58 c2b5da2f Stavros Sachtouris
    return v
59 c2b5da2f Stavros Sachtouris
60 3dabe5d2 Stavros Sachtouris
61 a1c50326 Giorgos Verigakis
class ClientError(Exception):
62 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
63 e9db8806 Stavros Sachtouris
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64 008a5db5 Stavros Sachtouris
            message,
65 008a5db5 Stavros Sachtouris
            status,
66 008a5db5 Stavros Sachtouris
            details))
67 de4f08ef Stavros Sachtouris
        try:
68 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
69 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
70 f4de4c91 Stavros Sachtouris
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
72 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
73 f4de4c91 Stavros Sachtouris
            serv_stat = serv_stat.strip()
74 062b1d0a Stavros Sachtouris
75 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
76 f4de4c91 Stavros Sachtouris
            message = '%s %s (%s)\n' % (
77 f4de4c91 Stavros Sachtouris
                serv_stat,
78 f4de4c91 Stavros Sachtouris
                key,
79 f4de4c91 Stavros Sachtouris
                json_msg['message']) if (
80 f4de4c91 Stavros Sachtouris
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81 f4de4c91 Stavros Sachtouris
            status = json_msg.get('code', status)
82 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
83 de4f08ef Stavros Sachtouris
                if not details:
84 de4f08ef Stavros Sachtouris
                    details = []
85 f4de4c91 Stavros Sachtouris
                if not isinstance(details, list):
86 de4f08ef Stavros Sachtouris
                    details = [details]
87 de4f08ef Stavros Sachtouris
                if json_msg['details']:
88 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
89 f4de4c91 Stavros Sachtouris
        except Exception:
90 de4f08ef Stavros Sachtouris
            pass
91 f4de4c91 Stavros Sachtouris
        finally:
92 5dde4c83 Stavros Sachtouris
            while message.endswith('\n\n'):
93 5dde4c83 Stavros Sachtouris
                message = message[:-1]
94 f4de4c91 Stavros Sachtouris
            super(ClientError, self).__init__(message)
95 f4de4c91 Stavros Sachtouris
            self.status = status if isinstance(status, int) else 0
96 f4de4c91 Stavros Sachtouris
            self.details = details if details else []
97 a1c50326 Giorgos Verigakis
98 3dabe5d2 Stavros Sachtouris
99 5fdccdec Stavros Sachtouris
class Logged(object):
100 5fdccdec Stavros Sachtouris
101 5fdccdec Stavros Sachtouris
    LOG_TOKEN = False
102 5fdccdec Stavros Sachtouris
    LOG_DATA = False
103 64a3c0de Stavros Sachtouris
    LOG_PID = False
104 7f85a914 Stavros Sachtouris
    _token = None
105 5fdccdec Stavros Sachtouris
106 5fdccdec Stavros Sachtouris
107 5fdccdec Stavros Sachtouris
class RequestManager(Logged):
108 c2b5da2f Stavros Sachtouris
    """Handle http request information"""
109 c2b5da2f Stavros Sachtouris
110 c2b5da2f Stavros Sachtouris
    def _connection_info(self, url, path, params={}):
111 c2b5da2f Stavros Sachtouris
        """ Set self.url to scheme://netloc/?params
112 c2b5da2f Stavros Sachtouris
        :param url: (str or unicode) The service url
113 c2b5da2f Stavros Sachtouris

114 c2b5da2f Stavros Sachtouris
        :param path: (str or unicode) The service path (url/path)
115 c2b5da2f Stavros Sachtouris

116 c2b5da2f Stavros Sachtouris
        :param params: (dict) Parameters to add to final url
117 c2b5da2f Stavros Sachtouris

118 c2b5da2f Stavros Sachtouris
        :returns: (scheme, netloc)
119 c2b5da2f Stavros Sachtouris
        """
120 819311d3 Stavros Sachtouris
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121 c2b5da2f Stavros Sachtouris
        url += '' if url.endswith('/') else '/'
122 c2b5da2f Stavros Sachtouris
        if path:
123 c2b5da2f Stavros Sachtouris
            url += _encode(path[1:] if path.startswith('/') else path)
124 7fa5c263 Stavros Sachtouris
        delim = '?'
125 7fa5c263 Stavros Sachtouris
        for key, val in params.items():
126 40b1ed91 Stavros Sachtouris
            val = '' if val in (None, False) else _encode(u'%s' % val)
127 7fa5c263 Stavros Sachtouris
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128 7fa5c263 Stavros Sachtouris
            delim = '&'
129 c2b5da2f Stavros Sachtouris
        parsed = urlparse(url)
130 c2b5da2f Stavros Sachtouris
        self.url = url
131 c2b5da2f Stavros Sachtouris
        self.path = parsed.path or '/'
132 c2b5da2f Stavros Sachtouris
        if parsed.query:
133 c2b5da2f Stavros Sachtouris
            self.path += '?%s' % parsed.query
134 c2b5da2f Stavros Sachtouris
        return (parsed.scheme, parsed.netloc)
135 c2b5da2f Stavros Sachtouris
136 c2b5da2f Stavros Sachtouris
    def __init__(
137 c2b5da2f Stavros Sachtouris
            self, method, url, path,
138 c2b5da2f Stavros Sachtouris
            data=None, headers={}, params={}):
139 c2b5da2f Stavros Sachtouris
        method = method.upper()
140 c2b5da2f Stavros Sachtouris
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141 c2b5da2f Stavros Sachtouris
        if headers:
142 c2b5da2f Stavros Sachtouris
            assert isinstance(headers, dict)
143 7fa5c263 Stavros Sachtouris
        self.headers = dict(headers)
144 c2b5da2f Stavros Sachtouris
        self.method, self.data = method, data
145 c2b5da2f Stavros Sachtouris
        self.scheme, self.netloc = self._connection_info(url, path, params)
146 c2b5da2f Stavros Sachtouris
147 c4563114 Stavros Sachtouris
    def dump_log(self):
148 85115c12 Stavros Sachtouris
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149 64a3c0de Stavros Sachtouris
        sendlog.info('- -  -   -     -        -             -')
150 64a3c0de Stavros Sachtouris
        sendlog.info('%s %s://%s%s%s' % (
151 64a3c0de Stavros Sachtouris
            self.method, self.scheme, self.netloc, self.path, plog))
152 21871fb2 Stavros Sachtouris
        for key, val in self.headers.items():
153 7f85a914 Stavros Sachtouris
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154 7f85a914 Stavros Sachtouris
                self._token, val = val, '...'
155 7f85a914 Stavros Sachtouris
            sendlog.info('  %s: %s%s' % (key, val, plog))
156 21871fb2 Stavros Sachtouris
        if self.data:
157 64a3c0de Stavros Sachtouris
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158 5fdccdec Stavros Sachtouris
            if self.LOG_DATA:
159 7f85a914 Stavros Sachtouris
                sendlog.info(self.data.replace(self._token, '...') if (
160 7f85a914 Stavros Sachtouris
                    self._token) else self.data)
161 21871fb2 Stavros Sachtouris
        else:
162 64a3c0de Stavros Sachtouris
            sendlog.info('data size:0%s' % plog)
163 21871fb2 Stavros Sachtouris
164 c2b5da2f Stavros Sachtouris
    def perform(self, conn):
165 c2b5da2f Stavros Sachtouris
        """
166 c2b5da2f Stavros Sachtouris
        :param conn: (httplib connection object)
167 c2b5da2f Stavros Sachtouris

168 c2b5da2f Stavros Sachtouris
        :returns: (HTTPResponse)
169 c2b5da2f Stavros Sachtouris
        """
170 fc79be92 Stavros Sachtouris
        self.dump_log()
171 c2b5da2f Stavros Sachtouris
        conn.request(
172 c2b5da2f Stavros Sachtouris
            method=str(self.method.upper()),
173 c2b5da2f Stavros Sachtouris
            url=str(self.path),
174 c2b5da2f Stavros Sachtouris
            headers=self.headers,
175 c2b5da2f Stavros Sachtouris
            body=self.data)
176 fc79be92 Stavros Sachtouris
        sendlog.info('')
177 f47417e7 Stavros Sachtouris
        keep_trying = TIMEOUT
178 f8eea8ec Stavros Sachtouris
        while keep_trying > 0:
179 c2b5da2f Stavros Sachtouris
            try:
180 c2b5da2f Stavros Sachtouris
                return conn.getresponse()
181 c2b5da2f Stavros Sachtouris
            except ResponseNotReady:
182 f8eea8ec Stavros Sachtouris
                wait = 0.03 * random()
183 f8eea8ec Stavros Sachtouris
                sleep(wait)
184 f8eea8ec Stavros Sachtouris
                keep_trying -= wait
185 85115c12 Stavros Sachtouris
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
186 64a3c0de Stavros Sachtouris
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187 34b88989 Stavros Sachtouris
        recvlog.debug(logmsg)
188 f8eea8ec Stavros Sachtouris
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189 c2b5da2f Stavros Sachtouris
190 c2b5da2f Stavros Sachtouris
191 5fdccdec Stavros Sachtouris
class ResponseManager(Logged):
192 c2b5da2f Stavros Sachtouris
    """Manage the http request and handle the response data, headers, etc."""
193 c2b5da2f Stavros Sachtouris
194 ec928235 Stavros Sachtouris
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195 c2b5da2f Stavros Sachtouris
        """
196 c2b5da2f Stavros Sachtouris
        :param request: (RequestManager)
197 ec928235 Stavros Sachtouris

198 ec928235 Stavros Sachtouris
        :param poolsize: (int) the size of the connection pool
199 ec928235 Stavros Sachtouris

200 ec928235 Stavros Sachtouris
        :param connection_retry_limit: (int)
201 c2b5da2f Stavros Sachtouris
        """
202 ec928235 Stavros Sachtouris
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203 c2b5da2f Stavros Sachtouris
        self.request = request
204 c2b5da2f Stavros Sachtouris
        self._request_performed = False
205 c2b5da2f Stavros Sachtouris
        self.poolsize = poolsize
206 c2b5da2f Stavros Sachtouris
207 c2b5da2f Stavros Sachtouris
    def _get_response(self):
208 c2b5da2f Stavros Sachtouris
        if self._request_performed:
209 c2b5da2f Stavros Sachtouris
            return
210 c2b5da2f Stavros Sachtouris
211 c2b5da2f Stavros Sachtouris
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212 ec928235 Stavros Sachtouris
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213 ec928235 Stavros Sachtouris
            try:
214 ec928235 Stavros Sachtouris
                with PooledHTTPConnection(
215 ec928235 Stavros Sachtouris
                        self.request.netloc, self.request.scheme,
216 ec928235 Stavros Sachtouris
                        **pool_kw) as connection:
217 ec928235 Stavros Sachtouris
                    self.request.LOG_TOKEN = self.LOG_TOKEN
218 ec928235 Stavros Sachtouris
                    self.request.LOG_DATA = self.LOG_DATA
219 64a3c0de Stavros Sachtouris
                    self.request.LOG_PID = self.LOG_PID
220 ec928235 Stavros Sachtouris
                    r = self.request.perform(connection)
221 64a3c0de Stavros Sachtouris
                    plog = ''
222 64a3c0de Stavros Sachtouris
                    if self.LOG_PID:
223 64a3c0de Stavros Sachtouris
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224 64a3c0de Stavros Sachtouris
                            self, r, self.request))
225 64a3c0de Stavros Sachtouris
                        plog = '\t[%s]' % self
226 ec928235 Stavros Sachtouris
                    self._request_performed = True
227 ec928235 Stavros Sachtouris
                    self._status_code, self._status = r.status, unquote(
228 ec928235 Stavros Sachtouris
                        r.reason)
229 ec928235 Stavros Sachtouris
                    recvlog.info(
230 64a3c0de Stavros Sachtouris
                        '%d %s%s' % (
231 64a3c0de Stavros Sachtouris
                            self.status_code, self.status, plog))
232 ec928235 Stavros Sachtouris
                    self._headers = dict()
233 ec928235 Stavros Sachtouris
                    for k, v in r.getheaders():
234 7f85a914 Stavros Sachtouris
                        if k.lower in ('x-auth-token', ) and (
235 7f85a914 Stavros Sachtouris
                                not self.LOG_TOKEN):
236 7f85a914 Stavros Sachtouris
                            self._token, v = v, '...'
237 ec928235 Stavros Sachtouris
                        v = unquote(v)
238 ec928235 Stavros Sachtouris
                        self._headers[k] = v
239 7f85a914 Stavros Sachtouris
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240 ec928235 Stavros Sachtouris
                    self._content = r.read()
241 64a3c0de Stavros Sachtouris
                    recvlog.info('data size: %s%s' % (
242 64a3c0de Stavros Sachtouris
                        len(self._content) if self._content else 0, plog))
243 ec928235 Stavros Sachtouris
                    if self.LOG_DATA and self._content:
244 7f85a914 Stavros Sachtouris
                        data = '%s%s' % (self._content, plog)
245 7f85a914 Stavros Sachtouris
                        if self._token:
246 7f85a914 Stavros Sachtouris
                            data = data.replace(self._token, '...')
247 85115c12 Stavros Sachtouris
                        recvlog.info(data)
248 85115c12 Stavros Sachtouris
                    recvlog.info('-             -        -     -   -  - -')
249 ec928235 Stavros Sachtouris
                break
250 ec928235 Stavros Sachtouris
            except Exception as err:
251 ec928235 Stavros Sachtouris
                if isinstance(err, HTTPException):
252 ec928235 Stavros Sachtouris
                    if retries >= self.CONNECTION_TRY_LIMIT:
253 ec928235 Stavros Sachtouris
                        raise ClientError(
254 ec928235 Stavros Sachtouris
                            'Connection to %s failed %s times (%s: %s )' % (
255 ec928235 Stavros Sachtouris
                                self.request.url, retries, type(err), err))
256 ec928235 Stavros Sachtouris
                else:
257 ec928235 Stavros Sachtouris
                    from traceback import format_stack
258 ec928235 Stavros Sachtouris
                    recvlog.debug(
259 ec928235 Stavros Sachtouris
                        '\n'.join(['%s' % type(err)] + format_stack()))
260 ec928235 Stavros Sachtouris
                    raise ClientError(
261 ec928235 Stavros Sachtouris
                        'Failed while http-connecting to %s (%s)' % (
262 64a3c0de Stavros Sachtouris
                            self.request.url, err))
263 c2b5da2f Stavros Sachtouris
264 c2b5da2f Stavros Sachtouris
    @property
265 c2b5da2f Stavros Sachtouris
    def status_code(self):
266 c2b5da2f Stavros Sachtouris
        self._get_response()
267 c2b5da2f Stavros Sachtouris
        return self._status_code
268 c2b5da2f Stavros Sachtouris
269 c2b5da2f Stavros Sachtouris
    @property
270 c2b5da2f Stavros Sachtouris
    def status(self):
271 c2b5da2f Stavros Sachtouris
        self._get_response()
272 c2b5da2f Stavros Sachtouris
        return self._status
273 c2b5da2f Stavros Sachtouris
274 c2b5da2f Stavros Sachtouris
    @property
275 c2b5da2f Stavros Sachtouris
    def headers(self):
276 c2b5da2f Stavros Sachtouris
        self._get_response()
277 c2b5da2f Stavros Sachtouris
        return self._headers
278 c2b5da2f Stavros Sachtouris
279 c2b5da2f Stavros Sachtouris
    @property
280 c2b5da2f Stavros Sachtouris
    def content(self):
281 c2b5da2f Stavros Sachtouris
        self._get_response()
282 c2b5da2f Stavros Sachtouris
        return self._content
283 c2b5da2f Stavros Sachtouris
284 c2b5da2f Stavros Sachtouris
    @property
285 c2b5da2f Stavros Sachtouris
    def text(self):
286 c2b5da2f Stavros Sachtouris
        """
287 c2b5da2f Stavros Sachtouris
        :returns: (str) content
288 c2b5da2f Stavros Sachtouris
        """
289 c2b5da2f Stavros Sachtouris
        self._get_response()
290 c2b5da2f Stavros Sachtouris
        return '%s' % self._content
291 c2b5da2f Stavros Sachtouris
292 c2b5da2f Stavros Sachtouris
    @property
293 c2b5da2f Stavros Sachtouris
    def json(self):
294 c2b5da2f Stavros Sachtouris
        """
295 c2b5da2f Stavros Sachtouris
        :returns: (dict) squeezed from json-formated content
296 c2b5da2f Stavros Sachtouris
        """
297 c2b5da2f Stavros Sachtouris
        self._get_response()
298 c2b5da2f Stavros Sachtouris
        try:
299 c2b5da2f Stavros Sachtouris
            return loads(self._content)
300 c2b5da2f Stavros Sachtouris
        except ValueError as err:
301 f8eea8ec Stavros Sachtouris
            raise ClientError('Response not formated in JSON - %s' % err)
302 c2b5da2f Stavros Sachtouris
303 c2b5da2f Stavros Sachtouris
304 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
305 34b88989 Stavros Sachtouris
    """Thread-run method(*args, **kwargs)"""
306 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
307 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
308 2b74ab4a Stavros Sachtouris
        self.method = method
309 2b74ab4a Stavros Sachtouris
        self.args = args
310 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
311 2b74ab4a Stavros Sachtouris
312 2b74ab4a Stavros Sachtouris
    @property
313 2b74ab4a Stavros Sachtouris
    def exception(self):
314 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
315 2b74ab4a Stavros Sachtouris
316 2b74ab4a Stavros Sachtouris
    @property
317 2b74ab4a Stavros Sachtouris
    def value(self):
318 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
319 2b74ab4a Stavros Sachtouris
320 2b74ab4a Stavros Sachtouris
    def run(self):
321 2b74ab4a Stavros Sachtouris
        try:
322 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
323 2b74ab4a Stavros Sachtouris
        except Exception as e:
324 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325 7644c38e Stavros Sachtouris
                self,
326 7644c38e Stavros Sachtouris
                type(e),
327 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
328 7644c38e Stavros Sachtouris
                e))
329 2b74ab4a Stavros Sachtouris
            self._exception = e
330 2b74ab4a Stavros Sachtouris
331 6069b53b Stavros Sachtouris
332 64a3c0de Stavros Sachtouris
class Client(Logged):
333 cad39033 Stavros Sachtouris
334 34b88989 Stavros Sachtouris
    MAX_THREADS = 7
335 b773795c Stavros Sachtouris
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336 ec928235 Stavros Sachtouris
    CONNECTION_RETRY_LIMIT = 0
337 34b88989 Stavros Sachtouris
338 c2b5da2f Stavros Sachtouris
    def __init__(self, base_url, token):
339 528550d9 Stavros Sachtouris
        assert base_url, 'No base_url for client %s' % self
340 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
341 6c62a96d Giorgos Verigakis
        self.token = token
342 c2b5da2f Stavros Sachtouris
        self.headers, self.params = dict(), dict()
343 6c62a96d Giorgos Verigakis
344 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
345 9c6c3d69 Stavros Sachtouris
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346 cad39033 Stavros Sachtouris
        self._thread_limit = limit
347 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
348 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
349 cad39033 Stavros Sachtouris
350 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
351 9c6c3d69 Stavros Sachtouris
        self._thread_limit = getattr(self, '_thread_limit', 1)
352 9c6c3d69 Stavros Sachtouris
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
353 9c6c3d69 Stavros Sachtouris
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
355 9c6c3d69 Stavros Sachtouris
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356 16b0afe6 Stavros Sachtouris
                self._thread_limit < self.MAX_THREADS):
357 2005b18e Stavros Sachtouris
            self._thread_limit += 1
358 9c6c3d69 Stavros Sachtouris
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
360 cad39033 Stavros Sachtouris
361 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
362 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
363 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
364 cad39033 Stavros Sachtouris
            for thread in threadlist:
365 cad39033 Stavros Sachtouris
                begin_time = time()
366 cad39033 Stavros Sachtouris
                thread.join()
367 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
368 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
369 cad39033 Stavros Sachtouris
            return []
370 cad39033 Stavros Sachtouris
        return threadlist
371 cad39033 Stavros Sachtouris
372 81c60832 Stavros Sachtouris
    def async_run(self, method, kwarg_list):
373 81c60832 Stavros Sachtouris
        """Fire threads of operations
374 81c60832 Stavros Sachtouris

375 81c60832 Stavros Sachtouris
        :param method: the method to run in each thread
376 81c60832 Stavros Sachtouris

377 81c60832 Stavros Sachtouris
        :param kwarg_list: (list of dicts) the arguments to pass in each method
378 81c60832 Stavros Sachtouris
            call
379 81c60832 Stavros Sachtouris

380 81c60832 Stavros Sachtouris
        :returns: (list) the results of each method call w.r. to the order of
381 81c60832 Stavros Sachtouris
            kwarg_list
382 81c60832 Stavros Sachtouris
        """
383 81c60832 Stavros Sachtouris
        flying, results = {}, {}
384 81c60832 Stavros Sachtouris
        self._init_thread_limit()
385 81c60832 Stavros Sachtouris
        for index, kwargs in enumerate(kwarg_list):
386 81c60832 Stavros Sachtouris
            self._watch_thread_limit(flying.values())
387 81c60832 Stavros Sachtouris
            flying[index] = SilentEvent(method=method, **kwargs)
388 81c60832 Stavros Sachtouris
            flying[index].start()
389 81c60832 Stavros Sachtouris
            unfinished = {}
390 81c60832 Stavros Sachtouris
            for key, thread in flying.items():
391 81c60832 Stavros Sachtouris
                if thread.isAlive():
392 81c60832 Stavros Sachtouris
                    unfinished[key] = thread
393 81c60832 Stavros Sachtouris
                elif thread.exception:
394 81c60832 Stavros Sachtouris
                    raise thread.exception
395 81c60832 Stavros Sachtouris
                else:
396 81c60832 Stavros Sachtouris
                    results[key] = thread.value
397 81c60832 Stavros Sachtouris
            flying = unfinished
398 81c60832 Stavros Sachtouris
        sendlog.info('- - - wait for threads to finish')
399 81c60832 Stavros Sachtouris
        for key, thread in flying.items():
400 81c60832 Stavros Sachtouris
            if thread.isAlive():
401 81c60832 Stavros Sachtouris
                thread.join()
402 c2e8d493 Stavros Sachtouris
            if thread.exception:
403 81c60832 Stavros Sachtouris
                raise thread.exception
404 40ddc207 Stavros Sachtouris
            results[key] = thread.value
405 81c60832 Stavros Sachtouris
        return results.values()
406 81c60832 Stavros Sachtouris
407 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
408 e9db8806 Stavros Sachtouris
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
409 5dde4c83 Stavros Sachtouris
        status_msg = getattr(r, 'status', None) or ''
410 d804de82 Stavros Sachtouris
        try:
411 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
412 d804de82 Stavros Sachtouris
        except:
413 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
414 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
415 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
416 6a0b1658 Giorgos Verigakis
417 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
418 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
419 4adfa919 Stavros Sachtouris
        if value is not None and iff:
420 82903313 Stavros Sachtouris
            self.headers[name] = unicode(value)
421 2f749e6e Stavros Sachtouris
422 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
423 2f749e6e Stavros Sachtouris
        if iff:
424 82903313 Stavros Sachtouris
            self.params[name] = unicode(value)
425 2f749e6e Stavros Sachtouris
426 de73876b Stavros Sachtouris
    def request(
427 c2b5da2f Stavros Sachtouris
            self, method, path,
428 c2b5da2f Stavros Sachtouris
            async_headers=dict(), async_params=dict(),
429 24ff0a35 Stavros Sachtouris
            **kwargs):
430 34b88989 Stavros Sachtouris
        """Commit an HTTP request to base_url/path
431 34b88989 Stavros Sachtouris
        Requests are commited to and performed by Request/ResponseManager
432 34b88989 Stavros Sachtouris
        These classes perform a lazy http request. Present method, by default,
433 34b88989 Stavros Sachtouris
        enforces them to perform the http call. Hint: call present method with
434 34b88989 Stavros Sachtouris
        success=None to get a non-performed ResponseManager object.
435 34b88989 Stavros Sachtouris
        """
436 6a6175c0 Stavros Sachtouris
        assert isinstance(method, str) or isinstance(method, unicode)
437 6a6175c0 Stavros Sachtouris
        assert method
438 6a6175c0 Stavros Sachtouris
        assert isinstance(path, str) or isinstance(path, unicode)
439 d726b3d0 Stavros Sachtouris
        try:
440 c2b5da2f Stavros Sachtouris
            headers = dict(self.headers)
441 c2b5da2f Stavros Sachtouris
            headers.update(async_headers)
442 c2b5da2f Stavros Sachtouris
            params = dict(self.params)
443 c2b5da2f Stavros Sachtouris
            params.update(async_params)
444 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
445 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
446 c2b5da2f Stavros Sachtouris
            headers.setdefault('X-Auth-Token', self.token)
447 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
448 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
449 c2b5da2f Stavros Sachtouris
                headers.setdefault('Content-Type', 'application/json')
450 2f749e6e Stavros Sachtouris
            if data:
451 c2b5da2f Stavros Sachtouris
                headers.setdefault('Content-Length', '%s' % len(data))
452 c2b5da2f Stavros Sachtouris
453 85115c12 Stavros Sachtouris
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
454 64a3c0de Stavros Sachtouris
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
455 c2b5da2f Stavros Sachtouris
            req = RequestManager(
456 c2b5da2f Stavros Sachtouris
                method, self.base_url, path,
457 c2b5da2f Stavros Sachtouris
                data=data, headers=headers, params=params)
458 21871fb2 Stavros Sachtouris
            #  req.log()
459 ec928235 Stavros Sachtouris
            r = ResponseManager(
460 ec928235 Stavros Sachtouris
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
461 64a3c0de Stavros Sachtouris
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
462 64a3c0de Stavros Sachtouris
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
463 7f85a914 Stavros Sachtouris
            r._token = headers['X-Auth-Token']
464 6a6175c0 Stavros Sachtouris
        finally:
465 c2b5da2f Stavros Sachtouris
            self.headers = dict()
466 c2b5da2f Stavros Sachtouris
            self.params = dict()
467 0238c167 Stavros Sachtouris
468 0238c167 Stavros Sachtouris
        if success is not None:
469 a037fd61 Stavros Sachtouris
            # Success can either be an int or a collection
470 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
471 0238c167 Stavros Sachtouris
            if r.status_code not in success:
472 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
473 d726b3d0 Stavros Sachtouris
        return r
474 a1c50326 Giorgos Verigakis
475 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
476 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
477 6a0b1658 Giorgos Verigakis
478 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
479 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
480 6a0b1658 Giorgos Verigakis
481 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
482 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
483 6a0b1658 Giorgos Verigakis
484 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
485 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
486 6a0b1658 Giorgos Verigakis
487 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
488 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
489 6a0b1658 Giorgos Verigakis
490 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
491 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
492 4adfa919 Stavros Sachtouris
493 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
494 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)