Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ a304ef33

History | View | Annotate | Download (20.5 kB)

1 e3f01d64 Stavros Sachtouris
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 545c6c29 Stavros Sachtouris
from urllib2 import quote, unquote
35 c2b5da2f Stavros Sachtouris
from urlparse import urlparse
36 2b74ab4a Stavros Sachtouris
from threading import Thread
37 de4f08ef Stavros Sachtouris
from json import dumps, loads
38 cad39033 Stavros Sachtouris
from time import time
39 ec928235 Stavros Sachtouris
from httplib import ResponseNotReady, HTTPException
40 c2b5da2f Stavros Sachtouris
from time import sleep
41 c2b5da2f Stavros Sachtouris
from random import random
42 c4d51ec9 Stavros Sachtouris
from logging import getLogger
43 c2b5da2f Stavros Sachtouris
44 c2b5da2f Stavros Sachtouris
from objpool.http import PooledHTTPConnection
45 61ca0ecd Stavros Sachtouris
46 6a0b1658 Giorgos Verigakis
47 21871fb2 Stavros Sachtouris
TIMEOUT = 60.0   # seconds
48 21871fb2 Stavros Sachtouris
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49 2406db97 Stavros Sachtouris
50 c4d51ec9 Stavros Sachtouris
log = getLogger(__name__)
51 c4d51ec9 Stavros Sachtouris
sendlog = getLogger('%s.send' % __name__)
52 c4d51ec9 Stavros Sachtouris
recvlog = getLogger('%s.recv' % __name__)
53 e8af27f4 Stavros Sachtouris
54 c2b5da2f Stavros Sachtouris
55 c2b5da2f Stavros Sachtouris
def _encode(v):
56 c2b5da2f Stavros Sachtouris
    if v and isinstance(v, unicode):
57 c2b5da2f Stavros Sachtouris
        return quote(v.encode('utf-8'))
58 c2b5da2f Stavros Sachtouris
    return v
59 c2b5da2f Stavros Sachtouris
60 3dabe5d2 Stavros Sachtouris
61 a1c50326 Giorgos Verigakis
class ClientError(Exception):
62 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
63 e9db8806 Stavros Sachtouris
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64 008a5db5 Stavros Sachtouris
            message,
65 008a5db5 Stavros Sachtouris
            status,
66 008a5db5 Stavros Sachtouris
            details))
67 de4f08ef Stavros Sachtouris
        try:
68 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
69 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
70 f4de4c91 Stavros Sachtouris
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
72 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
73 f4de4c91 Stavros Sachtouris
            serv_stat = serv_stat.strip()
74 062b1d0a Stavros Sachtouris
75 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
76 f4de4c91 Stavros Sachtouris
            message = '%s %s (%s)\n' % (
77 f4de4c91 Stavros Sachtouris
                serv_stat,
78 f4de4c91 Stavros Sachtouris
                key,
79 f4de4c91 Stavros Sachtouris
                json_msg['message']) if (
80 f4de4c91 Stavros Sachtouris
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81 f4de4c91 Stavros Sachtouris
            status = json_msg.get('code', status)
82 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
83 de4f08ef Stavros Sachtouris
                if not details:
84 de4f08ef Stavros Sachtouris
                    details = []
85 f4de4c91 Stavros Sachtouris
                if not isinstance(details, list):
86 de4f08ef Stavros Sachtouris
                    details = [details]
87 de4f08ef Stavros Sachtouris
                if json_msg['details']:
88 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
89 f4de4c91 Stavros Sachtouris
        except Exception:
90 de4f08ef Stavros Sachtouris
            pass
91 f4de4c91 Stavros Sachtouris
        finally:
92 5dde4c83 Stavros Sachtouris
            while message.endswith('\n\n'):
93 5dde4c83 Stavros Sachtouris
                message = message[:-1]
94 f4de4c91 Stavros Sachtouris
            super(ClientError, self).__init__(message)
95 f4de4c91 Stavros Sachtouris
            self.status = status if isinstance(status, int) else 0
96 f4de4c91 Stavros Sachtouris
            self.details = details if details else []
97 a1c50326 Giorgos Verigakis
98 3dabe5d2 Stavros Sachtouris
99 5fdccdec Stavros Sachtouris
class Logged(object):
100 5fdccdec Stavros Sachtouris
101 5fdccdec Stavros Sachtouris
    LOG_TOKEN = False
102 5fdccdec Stavros Sachtouris
    LOG_DATA = False
103 64a3c0de Stavros Sachtouris
    LOG_PID = False
104 7f85a914 Stavros Sachtouris
    _token = None
105 5fdccdec Stavros Sachtouris
106 5fdccdec Stavros Sachtouris
107 5fdccdec Stavros Sachtouris
class RequestManager(Logged):
108 c2b5da2f Stavros Sachtouris
    """Handle http request information"""
109 c2b5da2f Stavros Sachtouris
110 c2b5da2f Stavros Sachtouris
    def _connection_info(self, url, path, params={}):
111 c2b5da2f Stavros Sachtouris
        """ Set self.url to scheme://netloc/?params
112 c2b5da2f Stavros Sachtouris
        :param url: (str or unicode) The service url
113 c2b5da2f Stavros Sachtouris

114 c2b5da2f Stavros Sachtouris
        :param path: (str or unicode) The service path (url/path)
115 c2b5da2f Stavros Sachtouris

116 c2b5da2f Stavros Sachtouris
        :param params: (dict) Parameters to add to final url
117 c2b5da2f Stavros Sachtouris

118 c2b5da2f Stavros Sachtouris
        :returns: (scheme, netloc)
119 c2b5da2f Stavros Sachtouris
        """
120 819311d3 Stavros Sachtouris
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121 c2b5da2f Stavros Sachtouris
        url += '' if url.endswith('/') else '/'
122 c2b5da2f Stavros Sachtouris
        if path:
123 c2b5da2f Stavros Sachtouris
            url += _encode(path[1:] if path.startswith('/') else path)
124 7fa5c263 Stavros Sachtouris
        delim = '?'
125 7fa5c263 Stavros Sachtouris
        for key, val in params.items():
126 0d3785a1 Stavros Sachtouris
            val = quote('' if val in (None, False) else _encode('%s' % val))
127 7fa5c263 Stavros Sachtouris
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128 7fa5c263 Stavros Sachtouris
            delim = '&'
129 c2b5da2f Stavros Sachtouris
        parsed = urlparse(url)
130 c2b5da2f Stavros Sachtouris
        self.url = url
131 c2b5da2f Stavros Sachtouris
        self.path = parsed.path or '/'
132 c2b5da2f Stavros Sachtouris
        if parsed.query:
133 c2b5da2f Stavros Sachtouris
            self.path += '?%s' % parsed.query
134 c2b5da2f Stavros Sachtouris
        return (parsed.scheme, parsed.netloc)
135 c2b5da2f Stavros Sachtouris
136 c2b5da2f Stavros Sachtouris
    def __init__(
137 c2b5da2f Stavros Sachtouris
            self, method, url, path,
138 c2b5da2f Stavros Sachtouris
            data=None, headers={}, params={}):
139 c2b5da2f Stavros Sachtouris
        method = method.upper()
140 c2b5da2f Stavros Sachtouris
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141 c2b5da2f Stavros Sachtouris
        if headers:
142 c2b5da2f Stavros Sachtouris
            assert isinstance(headers, dict)
143 7fa5c263 Stavros Sachtouris
        self.headers = dict(headers)
144 c2b5da2f Stavros Sachtouris
        self.method, self.data = method, data
145 c2b5da2f Stavros Sachtouris
        self.scheme, self.netloc = self._connection_info(url, path, params)
146 c2b5da2f Stavros Sachtouris
147 c4563114 Stavros Sachtouris
    def dump_log(self):
148 85115c12 Stavros Sachtouris
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149 64a3c0de Stavros Sachtouris
        sendlog.info('- -  -   -     -        -             -')
150 64a3c0de Stavros Sachtouris
        sendlog.info('%s %s://%s%s%s' % (
151 64a3c0de Stavros Sachtouris
            self.method, self.scheme, self.netloc, self.path, plog))
152 21871fb2 Stavros Sachtouris
        for key, val in self.headers.items():
153 7f85a914 Stavros Sachtouris
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154 7f85a914 Stavros Sachtouris
                self._token, val = val, '...'
155 7f85a914 Stavros Sachtouris
            sendlog.info('  %s: %s%s' % (key, val, plog))
156 21871fb2 Stavros Sachtouris
        if self.data:
157 05ecf3a3 Stavros Sachtouris
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158 5fdccdec Stavros Sachtouris
            if self.LOG_DATA:
159 7f85a914 Stavros Sachtouris
                sendlog.info(self.data.replace(self._token, '...') if (
160 7f85a914 Stavros Sachtouris
                    self._token) else self.data)
161 21871fb2 Stavros Sachtouris
        else:
162 05ecf3a3 Stavros Sachtouris
            sendlog.info('data size: 0%s' % plog)
163 21871fb2 Stavros Sachtouris
164 6c6abf6e Stavros Sachtouris
    def _encode_headers(self):
165 6c6abf6e Stavros Sachtouris
        headers = self.headers
166 6c6abf6e Stavros Sachtouris
        for k, v in self.headers.items():
167 6c6abf6e Stavros Sachtouris
            headers[k] = quote(v)
168 6c6abf6e Stavros Sachtouris
        self.headers = headers
169 6c6abf6e Stavros Sachtouris
170 c2b5da2f Stavros Sachtouris
    def perform(self, conn):
171 c2b5da2f Stavros Sachtouris
        """
172 c2b5da2f Stavros Sachtouris
        :param conn: (httplib connection object)
173 c2b5da2f Stavros Sachtouris

174 c2b5da2f Stavros Sachtouris
        :returns: (HTTPResponse)
175 c2b5da2f Stavros Sachtouris
        """
176 6c6abf6e Stavros Sachtouris
        self._encode_headers()
177 a304ef33 Stavros Sachtouris
        self.dump_log()
178 c2b5da2f Stavros Sachtouris
        conn.request(
179 c2b5da2f Stavros Sachtouris
            method=str(self.method.upper()),
180 c2b5da2f Stavros Sachtouris
            url=str(self.path),
181 c2b5da2f Stavros Sachtouris
            headers=self.headers,
182 c2b5da2f Stavros Sachtouris
            body=self.data)
183 fc79be92 Stavros Sachtouris
        sendlog.info('')
184 f47417e7 Stavros Sachtouris
        keep_trying = TIMEOUT
185 f8eea8ec Stavros Sachtouris
        while keep_trying > 0:
186 c2b5da2f Stavros Sachtouris
            try:
187 c2b5da2f Stavros Sachtouris
                return conn.getresponse()
188 c2b5da2f Stavros Sachtouris
            except ResponseNotReady:
189 f8eea8ec Stavros Sachtouris
                wait = 0.03 * random()
190 f8eea8ec Stavros Sachtouris
                sleep(wait)
191 f8eea8ec Stavros Sachtouris
                keep_trying -= wait
192 85115c12 Stavros Sachtouris
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
193 64a3c0de Stavros Sachtouris
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194 34b88989 Stavros Sachtouris
        recvlog.debug(logmsg)
195 f8eea8ec Stavros Sachtouris
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
196 c2b5da2f Stavros Sachtouris
197 c2b5da2f Stavros Sachtouris
198 5fdccdec Stavros Sachtouris
class ResponseManager(Logged):
199 c2b5da2f Stavros Sachtouris
    """Manage the http request and handle the response data, headers, etc."""
200 c2b5da2f Stavros Sachtouris
201 ec928235 Stavros Sachtouris
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
202 c2b5da2f Stavros Sachtouris
        """
203 c2b5da2f Stavros Sachtouris
        :param request: (RequestManager)
204 ec928235 Stavros Sachtouris

205 ec928235 Stavros Sachtouris
        :param poolsize: (int) the size of the connection pool
206 ec928235 Stavros Sachtouris

207 ec928235 Stavros Sachtouris
        :param connection_retry_limit: (int)
208 c2b5da2f Stavros Sachtouris
        """
209 ec928235 Stavros Sachtouris
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210 c2b5da2f Stavros Sachtouris
        self.request = request
211 c2b5da2f Stavros Sachtouris
        self._request_performed = False
212 c2b5da2f Stavros Sachtouris
        self.poolsize = poolsize
213 c2b5da2f Stavros Sachtouris
214 c2b5da2f Stavros Sachtouris
    def _get_response(self):
215 c2b5da2f Stavros Sachtouris
        if self._request_performed:
216 c2b5da2f Stavros Sachtouris
            return
217 c2b5da2f Stavros Sachtouris
218 c2b5da2f Stavros Sachtouris
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
219 ec928235 Stavros Sachtouris
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
220 ec928235 Stavros Sachtouris
            try:
221 ec928235 Stavros Sachtouris
                with PooledHTTPConnection(
222 ec928235 Stavros Sachtouris
                        self.request.netloc, self.request.scheme,
223 ec928235 Stavros Sachtouris
                        **pool_kw) as connection:
224 ec928235 Stavros Sachtouris
                    self.request.LOG_TOKEN = self.LOG_TOKEN
225 ec928235 Stavros Sachtouris
                    self.request.LOG_DATA = self.LOG_DATA
226 64a3c0de Stavros Sachtouris
                    self.request.LOG_PID = self.LOG_PID
227 ec928235 Stavros Sachtouris
                    r = self.request.perform(connection)
228 64a3c0de Stavros Sachtouris
                    plog = ''
229 64a3c0de Stavros Sachtouris
                    if self.LOG_PID:
230 64a3c0de Stavros Sachtouris
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
231 64a3c0de Stavros Sachtouris
                            self, r, self.request))
232 64a3c0de Stavros Sachtouris
                        plog = '\t[%s]' % self
233 ec928235 Stavros Sachtouris
                    self._request_performed = True
234 ec928235 Stavros Sachtouris
                    self._status_code, self._status = r.status, unquote(
235 ec928235 Stavros Sachtouris
                        r.reason)
236 ec928235 Stavros Sachtouris
                    recvlog.info(
237 64a3c0de Stavros Sachtouris
                        '%d %s%s' % (
238 64a3c0de Stavros Sachtouris
                            self.status_code, self.status, plog))
239 ec928235 Stavros Sachtouris
                    self._headers = dict()
240 ec928235 Stavros Sachtouris
                    for k, v in r.getheaders():
241 7f85a914 Stavros Sachtouris
                        if k.lower in ('x-auth-token', ) and (
242 7f85a914 Stavros Sachtouris
                                not self.LOG_TOKEN):
243 7f85a914 Stavros Sachtouris
                            self._token, v = v, '...'
244 2f302751 Stavros Sachtouris
                        v = unquote(v).decode('utf-8')
245 ec928235 Stavros Sachtouris
                        self._headers[k] = v
246 7f85a914 Stavros Sachtouris
                        recvlog.info('  %s: %s%s' % (k, v, plog))
247 ec928235 Stavros Sachtouris
                    self._content = r.read()
248 64a3c0de Stavros Sachtouris
                    recvlog.info('data size: %s%s' % (
249 64a3c0de Stavros Sachtouris
                        len(self._content) if self._content else 0, plog))
250 ec928235 Stavros Sachtouris
                    if self.LOG_DATA and self._content:
251 7f85a914 Stavros Sachtouris
                        data = '%s%s' % (self._content, plog)
252 7f85a914 Stavros Sachtouris
                        if self._token:
253 7f85a914 Stavros Sachtouris
                            data = data.replace(self._token, '...')
254 85115c12 Stavros Sachtouris
                        recvlog.info(data)
255 85115c12 Stavros Sachtouris
                    recvlog.info('-             -        -     -   -  - -')
256 ec928235 Stavros Sachtouris
                break
257 ec928235 Stavros Sachtouris
            except Exception as err:
258 ec928235 Stavros Sachtouris
                if isinstance(err, HTTPException):
259 ec928235 Stavros Sachtouris
                    if retries >= self.CONNECTION_TRY_LIMIT:
260 ec928235 Stavros Sachtouris
                        raise ClientError(
261 ec928235 Stavros Sachtouris
                            'Connection to %s failed %s times (%s: %s )' % (
262 ec928235 Stavros Sachtouris
                                self.request.url, retries, type(err), err))
263 ec928235 Stavros Sachtouris
                else:
264 ec928235 Stavros Sachtouris
                    from traceback import format_stack
265 ec928235 Stavros Sachtouris
                    recvlog.debug(
266 ec928235 Stavros Sachtouris
                        '\n'.join(['%s' % type(err)] + format_stack()))
267 f0bddbda Stavros Sachtouris
                    raise
268 ec928235 Stavros Sachtouris
                    raise ClientError(
269 ec928235 Stavros Sachtouris
                        'Failed while http-connecting to %s (%s)' % (
270 64a3c0de Stavros Sachtouris
                            self.request.url, err))
271 c2b5da2f Stavros Sachtouris
272 c2b5da2f Stavros Sachtouris
    @property
273 c2b5da2f Stavros Sachtouris
    def status_code(self):
274 c2b5da2f Stavros Sachtouris
        self._get_response()
275 c2b5da2f Stavros Sachtouris
        return self._status_code
276 c2b5da2f Stavros Sachtouris
277 c2b5da2f Stavros Sachtouris
    @property
278 c2b5da2f Stavros Sachtouris
    def status(self):
279 c2b5da2f Stavros Sachtouris
        self._get_response()
280 c2b5da2f Stavros Sachtouris
        return self._status
281 c2b5da2f Stavros Sachtouris
282 c2b5da2f Stavros Sachtouris
    @property
283 c2b5da2f Stavros Sachtouris
    def headers(self):
284 c2b5da2f Stavros Sachtouris
        self._get_response()
285 c2b5da2f Stavros Sachtouris
        return self._headers
286 c2b5da2f Stavros Sachtouris
287 c2b5da2f Stavros Sachtouris
    @property
288 c2b5da2f Stavros Sachtouris
    def content(self):
289 c2b5da2f Stavros Sachtouris
        self._get_response()
290 c2b5da2f Stavros Sachtouris
        return self._content
291 c2b5da2f Stavros Sachtouris
292 c2b5da2f Stavros Sachtouris
    @property
293 c2b5da2f Stavros Sachtouris
    def text(self):
294 c2b5da2f Stavros Sachtouris
        """
295 c2b5da2f Stavros Sachtouris
        :returns: (str) content
296 c2b5da2f Stavros Sachtouris
        """
297 c2b5da2f Stavros Sachtouris
        self._get_response()
298 c2b5da2f Stavros Sachtouris
        return '%s' % self._content
299 c2b5da2f Stavros Sachtouris
300 c2b5da2f Stavros Sachtouris
    @property
301 c2b5da2f Stavros Sachtouris
    def json(self):
302 c2b5da2f Stavros Sachtouris
        """
303 c2b5da2f Stavros Sachtouris
        :returns: (dict) squeezed from json-formated content
304 c2b5da2f Stavros Sachtouris
        """
305 c2b5da2f Stavros Sachtouris
        self._get_response()
306 c2b5da2f Stavros Sachtouris
        try:
307 c2b5da2f Stavros Sachtouris
            return loads(self._content)
308 c2b5da2f Stavros Sachtouris
        except ValueError as err:
309 f8eea8ec Stavros Sachtouris
            raise ClientError('Response not formated in JSON - %s' % err)
310 c2b5da2f Stavros Sachtouris
311 c2b5da2f Stavros Sachtouris
312 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
313 34b88989 Stavros Sachtouris
    """Thread-run method(*args, **kwargs)"""
314 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
315 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
316 2b74ab4a Stavros Sachtouris
        self.method = method
317 2b74ab4a Stavros Sachtouris
        self.args = args
318 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
319 2b74ab4a Stavros Sachtouris
320 2b74ab4a Stavros Sachtouris
    @property
321 2b74ab4a Stavros Sachtouris
    def exception(self):
322 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
323 2b74ab4a Stavros Sachtouris
324 2b74ab4a Stavros Sachtouris
    @property
325 2b74ab4a Stavros Sachtouris
    def value(self):
326 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
327 2b74ab4a Stavros Sachtouris
328 2b74ab4a Stavros Sachtouris
    def run(self):
329 2b74ab4a Stavros Sachtouris
        try:
330 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
331 2b74ab4a Stavros Sachtouris
        except Exception as e:
332 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
333 7644c38e Stavros Sachtouris
                self,
334 7644c38e Stavros Sachtouris
                type(e),
335 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
336 7644c38e Stavros Sachtouris
                e))
337 2b74ab4a Stavros Sachtouris
            self._exception = e
338 2b74ab4a Stavros Sachtouris
339 6069b53b Stavros Sachtouris
340 64a3c0de Stavros Sachtouris
class Client(Logged):
341 cad39033 Stavros Sachtouris
342 ec5d658f Stavros Sachtouris
    MAX_THREADS = 1
343 b773795c Stavros Sachtouris
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
344 ec928235 Stavros Sachtouris
    CONNECTION_RETRY_LIMIT = 0
345 34b88989 Stavros Sachtouris
346 c2b5da2f Stavros Sachtouris
    def __init__(self, base_url, token):
347 528550d9 Stavros Sachtouris
        assert base_url, 'No base_url for client %s' % self
348 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
349 6c62a96d Giorgos Verigakis
        self.token = token
350 c2b5da2f Stavros Sachtouris
        self.headers, self.params = dict(), dict()
351 61d579fb Stavros Sachtouris
        self.poolsize = None
352 6c62a96d Giorgos Verigakis
353 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
354 9c6c3d69 Stavros Sachtouris
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
355 cad39033 Stavros Sachtouris
        self._thread_limit = limit
356 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
357 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
358 cad39033 Stavros Sachtouris
359 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
360 9c6c3d69 Stavros Sachtouris
        self._thread_limit = getattr(self, '_thread_limit', 1)
361 9c6c3d69 Stavros Sachtouris
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
362 9c6c3d69 Stavros Sachtouris
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
363 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
364 9c6c3d69 Stavros Sachtouris
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
365 16b0afe6 Stavros Sachtouris
                self._thread_limit < self.MAX_THREADS):
366 2005b18e Stavros Sachtouris
            self._thread_limit += 1
367 9c6c3d69 Stavros Sachtouris
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
368 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
369 cad39033 Stavros Sachtouris
370 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
371 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
372 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
373 cad39033 Stavros Sachtouris
            for thread in threadlist:
374 cad39033 Stavros Sachtouris
                begin_time = time()
375 cad39033 Stavros Sachtouris
                thread.join()
376 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
377 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
378 cad39033 Stavros Sachtouris
            return []
379 cad39033 Stavros Sachtouris
        return threadlist
380 cad39033 Stavros Sachtouris
381 81c60832 Stavros Sachtouris
    def async_run(self, method, kwarg_list):
382 81c60832 Stavros Sachtouris
        """Fire threads of operations
383 81c60832 Stavros Sachtouris

384 81c60832 Stavros Sachtouris
        :param method: the method to run in each thread
385 81c60832 Stavros Sachtouris

386 81c60832 Stavros Sachtouris
        :param kwarg_list: (list of dicts) the arguments to pass in each method
387 81c60832 Stavros Sachtouris
            call
388 81c60832 Stavros Sachtouris

389 81c60832 Stavros Sachtouris
        :returns: (list) the results of each method call w.r. to the order of
390 81c60832 Stavros Sachtouris
            kwarg_list
391 81c60832 Stavros Sachtouris
        """
392 81c60832 Stavros Sachtouris
        flying, results = {}, {}
393 81c60832 Stavros Sachtouris
        self._init_thread_limit()
394 81c60832 Stavros Sachtouris
        for index, kwargs in enumerate(kwarg_list):
395 81c60832 Stavros Sachtouris
            self._watch_thread_limit(flying.values())
396 81c60832 Stavros Sachtouris
            flying[index] = SilentEvent(method=method, **kwargs)
397 81c60832 Stavros Sachtouris
            flying[index].start()
398 81c60832 Stavros Sachtouris
            unfinished = {}
399 81c60832 Stavros Sachtouris
            for key, thread in flying.items():
400 81c60832 Stavros Sachtouris
                if thread.isAlive():
401 81c60832 Stavros Sachtouris
                    unfinished[key] = thread
402 81c60832 Stavros Sachtouris
                elif thread.exception:
403 81c60832 Stavros Sachtouris
                    raise thread.exception
404 81c60832 Stavros Sachtouris
                else:
405 81c60832 Stavros Sachtouris
                    results[key] = thread.value
406 81c60832 Stavros Sachtouris
            flying = unfinished
407 81c60832 Stavros Sachtouris
        sendlog.info('- - - wait for threads to finish')
408 81c60832 Stavros Sachtouris
        for key, thread in flying.items():
409 81c60832 Stavros Sachtouris
            if thread.isAlive():
410 81c60832 Stavros Sachtouris
                thread.join()
411 c2e8d493 Stavros Sachtouris
            if thread.exception:
412 81c60832 Stavros Sachtouris
                raise thread.exception
413 40ddc207 Stavros Sachtouris
            results[key] = thread.value
414 81c60832 Stavros Sachtouris
        return results.values()
415 81c60832 Stavros Sachtouris
416 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
417 e9db8806 Stavros Sachtouris
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
418 5dde4c83 Stavros Sachtouris
        status_msg = getattr(r, 'status', None) or ''
419 d804de82 Stavros Sachtouris
        try:
420 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
421 d804de82 Stavros Sachtouris
        except:
422 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
423 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
424 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
425 6a0b1658 Giorgos Verigakis
426 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
427 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
428 4adfa919 Stavros Sachtouris
        if value is not None and iff:
429 f0bddbda Stavros Sachtouris
            self.headers['%s' % name] = '%s' % value
430 2f749e6e Stavros Sachtouris
431 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
432 2f749e6e Stavros Sachtouris
        if iff:
433 f0bddbda Stavros Sachtouris
            self.params[name] = '%s' % value
434 2f749e6e Stavros Sachtouris
435 de73876b Stavros Sachtouris
    def request(
436 c2b5da2f Stavros Sachtouris
            self, method, path,
437 c2b5da2f Stavros Sachtouris
            async_headers=dict(), async_params=dict(),
438 24ff0a35 Stavros Sachtouris
            **kwargs):
439 34b88989 Stavros Sachtouris
        """Commit an HTTP request to base_url/path
440 34b88989 Stavros Sachtouris
        Requests are commited to and performed by Request/ResponseManager
441 34b88989 Stavros Sachtouris
        These classes perform a lazy http request. Present method, by default,
442 34b88989 Stavros Sachtouris
        enforces them to perform the http call. Hint: call present method with
443 34b88989 Stavros Sachtouris
        success=None to get a non-performed ResponseManager object.
444 34b88989 Stavros Sachtouris
        """
445 6a6175c0 Stavros Sachtouris
        assert isinstance(method, str) or isinstance(method, unicode)
446 6a6175c0 Stavros Sachtouris
        assert method
447 6a6175c0 Stavros Sachtouris
        assert isinstance(path, str) or isinstance(path, unicode)
448 d726b3d0 Stavros Sachtouris
        try:
449 c2b5da2f Stavros Sachtouris
            headers = dict(self.headers)
450 c2b5da2f Stavros Sachtouris
            headers.update(async_headers)
451 c2b5da2f Stavros Sachtouris
            params = dict(self.params)
452 c2b5da2f Stavros Sachtouris
            params.update(async_params)
453 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
454 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
455 c2b5da2f Stavros Sachtouris
            headers.setdefault('X-Auth-Token', self.token)
456 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
457 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
458 c2b5da2f Stavros Sachtouris
                headers.setdefault('Content-Type', 'application/json')
459 2f749e6e Stavros Sachtouris
            if data:
460 c2b5da2f Stavros Sachtouris
                headers.setdefault('Content-Length', '%s' % len(data))
461 c2b5da2f Stavros Sachtouris
462 85115c12 Stavros Sachtouris
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
463 64a3c0de Stavros Sachtouris
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
464 c2b5da2f Stavros Sachtouris
            req = RequestManager(
465 c2b5da2f Stavros Sachtouris
                method, self.base_url, path,
466 c2b5da2f Stavros Sachtouris
                data=data, headers=headers, params=params)
467 21871fb2 Stavros Sachtouris
            #  req.log()
468 ec928235 Stavros Sachtouris
            r = ResponseManager(
469 61d579fb Stavros Sachtouris
                req,
470 61d579fb Stavros Sachtouris
                poolsize=self.poolsize,
471 61d579fb Stavros Sachtouris
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
472 64a3c0de Stavros Sachtouris
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
473 64a3c0de Stavros Sachtouris
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
474 7f85a914 Stavros Sachtouris
            r._token = headers['X-Auth-Token']
475 6a6175c0 Stavros Sachtouris
        finally:
476 c2b5da2f Stavros Sachtouris
            self.headers = dict()
477 c2b5da2f Stavros Sachtouris
            self.params = dict()
478 0238c167 Stavros Sachtouris
479 0238c167 Stavros Sachtouris
        if success is not None:
480 a037fd61 Stavros Sachtouris
            # Success can either be an int or a collection
481 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
482 0238c167 Stavros Sachtouris
            if r.status_code not in success:
483 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
484 d726b3d0 Stavros Sachtouris
        return r
485 a1c50326 Giorgos Verigakis
486 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
487 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
488 6a0b1658 Giorgos Verigakis
489 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
490 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
491 6a0b1658 Giorgos Verigakis
492 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
493 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
494 6a0b1658 Giorgos Verigakis
495 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
496 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
497 6a0b1658 Giorgos Verigakis
498 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
499 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
500 6a0b1658 Giorgos Verigakis
501 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
502 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
503 4adfa919 Stavros Sachtouris
504 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
505 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)
506 6f2b87c1 Stavros Sachtouris
507 6f2b87c1 Stavros Sachtouris
508 6f2b87c1 Stavros Sachtouris
class Waiter(object):
509 6f2b87c1 Stavros Sachtouris
510 6f2b87c1 Stavros Sachtouris
    def _wait(
511 a6a44506 Stavros Sachtouris
            self, item_id, wait_status, get_status,
512 a6a44506 Stavros Sachtouris
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
513 a6a44506 Stavros Sachtouris
        """Wait while the item is still in wait_status or to reach it
514 6f2b87c1 Stavros Sachtouris

515 6f2b87c1 Stavros Sachtouris
        :param server_id: integer (str or int)
516 6f2b87c1 Stavros Sachtouris

517 a6a44506 Stavros Sachtouris
        :param wait_status: (str)
518 6f2b87c1 Stavros Sachtouris

519 6f2b87c1 Stavros Sachtouris
        :param get_status: (method(self, item_id)) if called, returns
520 6f2b87c1 Stavros Sachtouris
            (status, progress %) If no way to tell progress, return None
521 6f2b87c1 Stavros Sachtouris

522 6f2b87c1 Stavros Sachtouris
        :param delay: time interval between retries
523 6f2b87c1 Stavros Sachtouris

524 6f2b87c1 Stavros Sachtouris
        :param wait_cb: (method(total steps)) returns a generator for
525 6f2b87c1 Stavros Sachtouris
            reporting progress or timeouts i.e., for a progress bar
526 6f2b87c1 Stavros Sachtouris

527 a6a44506 Stavros Sachtouris
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
528 a6a44506 Stavros Sachtouris

529 6f2b87c1 Stavros Sachtouris
        :returns: (str) the new mode if successful, (bool) False if timed out
530 6f2b87c1 Stavros Sachtouris
        """
531 6f2b87c1 Stavros Sachtouris
        status, progress = get_status(self, item_id)
532 6f2b87c1 Stavros Sachtouris
533 6f2b87c1 Stavros Sachtouris
        if wait_cb:
534 6f2b87c1 Stavros Sachtouris
            wait_gen = wait_cb(max_wait // delay)
535 6f2b87c1 Stavros Sachtouris
            wait_gen.next()
536 6f2b87c1 Stavros Sachtouris
537 a6a44506 Stavros Sachtouris
        if wait_for_status ^ (status != wait_status):
538 a6a44506 Stavros Sachtouris
            # if wait_cb:
539 a6a44506 Stavros Sachtouris
            #     try:
540 a6a44506 Stavros Sachtouris
            #         wait_gen.next()
541 a6a44506 Stavros Sachtouris
            #     except Exception:
542 a6a44506 Stavros Sachtouris
            #         pass
543 6f2b87c1 Stavros Sachtouris
            return status
544 6f2b87c1 Stavros Sachtouris
        old_wait = total_wait = 0
545 6f2b87c1 Stavros Sachtouris
546 a6a44506 Stavros Sachtouris
        while (wait_for_status ^ (status == wait_status)) and (
547 a6a44506 Stavros Sachtouris
                total_wait <= max_wait):
548 6f2b87c1 Stavros Sachtouris
            if wait_cb:
549 6f2b87c1 Stavros Sachtouris
                try:
550 6f2b87c1 Stavros Sachtouris
                    for i in range(total_wait - old_wait):
551 6f2b87c1 Stavros Sachtouris
                        wait_gen.next()
552 6f2b87c1 Stavros Sachtouris
                except Exception:
553 6f2b87c1 Stavros Sachtouris
                    break
554 6f2b87c1 Stavros Sachtouris
            old_wait = total_wait
555 6f2b87c1 Stavros Sachtouris
            total_wait = progress or total_wait + 1
556 6f2b87c1 Stavros Sachtouris
            sleep(delay)
557 6f2b87c1 Stavros Sachtouris
            status, progress = get_status(self, item_id)
558 6f2b87c1 Stavros Sachtouris
559 6f2b87c1 Stavros Sachtouris
        if total_wait < max_wait:
560 6f2b87c1 Stavros Sachtouris
            if wait_cb:
561 6f2b87c1 Stavros Sachtouris
                try:
562 6f2b87c1 Stavros Sachtouris
                    for i in range(max_wait):
563 6f2b87c1 Stavros Sachtouris
                        wait_gen.next()
564 6f2b87c1 Stavros Sachtouris
                except:
565 6f2b87c1 Stavros Sachtouris
                    pass
566 a6a44506 Stavros Sachtouris
        return status if (wait_for_status ^ (status != wait_status)) else False
567 a6a44506 Stavros Sachtouris
568 a6a44506 Stavros Sachtouris
    def wait_for(
569 a6a44506 Stavros Sachtouris
            self, item_id, target_status, get_status,
570 a6a44506 Stavros Sachtouris
            delay=1, max_wait=100, wait_cb=None):
571 a6a44506 Stavros Sachtouris
        self._wait(
572 a6a44506 Stavros Sachtouris
            item_id, target_status, get_status, delay, max_wait, wait_cb,
573 a6a44506 Stavros Sachtouris
            wait_for_status=True)
574 a6a44506 Stavros Sachtouris
575 a6a44506 Stavros Sachtouris
    def wait_while(
576 a6a44506 Stavros Sachtouris
            self, item_id, target_status, get_status,
577 a6a44506 Stavros Sachtouris
            delay=1, max_wait=100, wait_cb=None):
578 a6a44506 Stavros Sachtouris
        self._wait(
579 a6a44506 Stavros Sachtouris
            item_id, target_status, get_status, delay, max_wait, wait_cb,
580 a6a44506 Stavros Sachtouris
            wait_for_status=False)