Revision c2b5da2f kamaki/clients/__init__.py

b/kamaki/clients/__init__.py
32 32
# or implied, of GRNET S.A.
33 33

  
34 34
from urllib2 import quote
35
from urlparse import urlparse
35 36
from threading import Thread
36 37
from json import dumps, loads
37 38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42

  
43
from objpool.http import PooledHTTPConnection
38 44

  
39 45
from kamaki.clients.utils import logger
40
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
41
from kamaki.clients.connection.errors import KamakiConnectionError
42
from kamaki.clients.connection.errors import KamakiResponseError
43 46

  
44 47
LOG_TOKEN = False
45 48
DEBUG_LOG = logger.get_log_filename()
......
60 63
logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG)
61 64
clienterrorlog = logger.get_logger('ClientError')
62 65

  
66
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
67

  
68

  
69
def _encode(v):
70
    if v and isinstance(v, unicode):
71
        return quote(v.encode('utf-8'))
72
    return v
73

  
63 74

  
64 75
class ClientError(Exception):
65 76
    def __init__(self, message, status=0, details=None):
......
97 108
            self.details = details if details else []
98 109

  
99 110

  
111
class RequestManager(object):
112
    """Handle http request information"""
113

  
114
    def _connection_info(self, url, path, params={}):
115
        """ Set self.url to scheme://netloc/?params
116
        :param url: (str or unicode) The service url
117

  
118
        :param path: (str or unicode) The service path (url/path)
119

  
120
        :param params: (dict) Parameters to add to final url
121

  
122
        :returns: (scheme, netloc)
123
        """
124
        url = _encode(url) if url else 'http://127.0.0.1/'
125
        url += '' if url.endswith('/') else '/'
126
        if path:
127
            url += _encode(path[1:] if path.startswith('/') else path)
128
        for i, (key, val) in enumerate(params.items()):
129
            val = _encode(val)
130
            url += '%s%s' % ('&' if i else '?', key)
131
            if val:
132
                url += '=%s' % val
133
        parsed = urlparse(url)
134
        self.url = url
135
        self.path = parsed.path or '/'
136
        if parsed.query:
137
            self.path += '?%s' % parsed.query
138
        return (parsed.scheme, parsed.netloc)
139

  
140
    def __init__(
141
            self, method, url, path,
142
            data=None, headers={}, params={}):
143
        method = method.upper()
144
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
145
        if headers:
146
            assert isinstance(headers, dict)
147
            self.headers = dict(headers)
148
        self.method, self.data = method, data
149
        self.scheme, self.netloc = self._connection_info(url, path, params)
150

  
151
    def perform(self, conn):
152
        """
153
        :param conn: (httplib connection object)
154

  
155
        :returns: (HTTPResponse)
156
        """
157
        #  sendlog.debug(
158
        #    'RequestManager.perform mthd(%s), url(%s), headrs(%s), bdlen(%s)',
159
        #    self.method, self.url, self.headers, self.data)
160
        conn.request(
161
            method=str(self.method.upper()),
162
            url=str(self.path),
163
            headers=self.headers,
164
            body=self.data)
165
        while True:
166
            try:
167
                return conn.getresponse()
168
            except ResponseNotReady:
169
                sleep(0.03 * random())
170

  
171

  
172
class ResponseManager(object):
173
    """Manage the http request and handle the response data, headers, etc."""
174

  
175
    def __init__(self, request, poolsize=None):
176
        """
177
        :param request: (RequestManager)
178
        """
179
        self.request = request
180
        self._request_performed = False
181
        self.poolsize = poolsize
182

  
183
    def _get_response(self):
184
        if self._request_performed:
185
            return
186

  
187
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
188
        try:
189
            with PooledHTTPConnection(
190
                    self.request.netloc, self.request.scheme,
191
                    **pool_kw) as connection:
192
                r = self.request.perform(connection)
193
                #  recvlog.debug('ResponseManager(%s):' % r)
194
                self._request_performed = True
195
                self._headers = dict()
196
                for k, v in r.getheaders():
197
                    self.headers[k] = v
198
                    #  recvlog.debug('\t%s: %s\t(%s)' % (k, v, r))
199
                self._content = r.read()
200
                self._status_code = r.status
201
                self._status = r.reason
202
        except Exception as err:
203
            from kamaki.clients import recvlog
204
            from traceback import format_stack
205
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
206
            raise ClientError(
207
                'Failed while http-connecting to %s (%s)' % (
208
                    self.request.url,
209
                    err),
210
                1000)
211

  
212
    @property
213
    def status_code(self):
214
        self._get_response()
215
        return self._status_code
216

  
217
    @property
218
    def status(self):
219
        self._get_response()
220
        return self._status
221

  
222
    @property
223
    def headers(self):
224
        self._get_response()
225
        return self._headers
226

  
227
    @property
228
    def content(self):
229
        self._get_response()
230
        return self._content
231

  
232
    @property
233
    def text(self):
234
        """
235
        :returns: (str) content
236
        """
237
        self._get_response()
238
        return '%s' % self._content
239

  
240
    @property
241
    def json(self):
242
        """
243
        :returns: (dict) squeezed from json-formated content
244
        """
245
        self._get_response()
246
        try:
247
            return loads(self._content)
248
        except ValueError as err:
249
            ClientError('Response not formated in JSON - %s' % err)
250

  
251

  
100 252
class SilentEvent(Thread):
101 253
    """ Thread-run method(*args, **kwargs)"""
102 254
    def __init__(self, method, *args, **kwargs):
......
127 279

  
128 280
class Client(object):
129 281

  
130
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
282
    def __init__(self, base_url, token):
131 283
        self.base_url = base_url
132 284
        self.token = token
133
        self.headers = {}
285
        self.headers, self.params = dict(), dict()
134 286
        self.DATE_FORMATS = [
135 287
            '%a %b %d %H:%M:%S %Y',
136 288
            '%A, %d-%b-%y %H:%M:%S GMT',
137 289
            '%a, %d %b %Y %H:%M:%S GMT']
138
        self.http_client = http_client
139 290
        self.MAX_THREADS = 7
140 291

  
141 292
    def _init_thread_limit(self, limit=1):
......
179 330
    def set_header(self, name, value, iff=True):
180 331
        """Set a header 'name':'value'"""
181 332
        if value is not None and iff:
182
            self.http_client.set_header(name, value)
333
            self.headers[name] = value
183 334

  
184 335
    def set_param(self, name, value=None, iff=True):
185 336
        if iff:
186
            self.http_client.set_param(name, value)
337
            self.params[name] = value
187 338

  
188 339
    def request(
189
            self,
190
            method,
191
            path,
192
            async_headers={},
193
            async_params={},
340
            self, method, path,
341
            async_headers=dict(), async_params=dict(),
194 342
            **kwargs):
195 343
        """In threaded/asynchronous requests, headers and params are not safe
196 344
        Therefore, the standard self.set_header/param system can be used only
......
205 353
        assert method
206 354
        assert isinstance(path, str) or isinstance(path, unicode)
207 355
        try:
356
            headers = dict(self.headers)
357
            headers.update(async_headers)
358
            params = dict(self.params)
359
            params.update(async_params)
208 360
            success = kwargs.pop('success', 200)
209 361
            data = kwargs.pop('data', None)
210
            self.http_client.headers.setdefault('X-Auth-Token', self.token)
211

  
362
            headers.setdefault('X-Auth-Token', self.token)
212 363
            if 'json' in kwargs:
213 364
                data = dumps(kwargs.pop('json'))
214
                self.http_client.headers.setdefault(
215
                    'Content-Type',
216
                    'application/json')
365
                headers.setdefault('Content-Type', 'application/json')
217 366
            if data:
218
                self.http_client.headers.setdefault(
219
                    'Content-Length',
220
                    '%s' % len(data))
221

  
222
            sendlog.info('perform a %s @ %s', method, self.base_url)
223

  
224
            self.http_client.url = self.base_url
225
            self.http_client.path = quote(path.encode('utf8'))
226
            r = self.http_client.perform_request(
227
                method,
228
                data,
229
                async_headers,
230
                async_params)
231

  
232
            req = self.http_client
233
            sendlog.info('%s %s', method, req.url)
234
            headers = dict(req.headers)
235
            headers.update(async_headers)
236

  
237
            for key, val in headers.items():
367
                headers.setdefault('Content-Length', '%s' % len(data))
368

  
369
            req = RequestManager(
370
                method, self.base_url, path,
371
                data=data, headers=headers, params=params)
372
            sendlog.info('commit a %s @ %s\t[%s]', method, self.base_url, self)
373
            sendlog.info('\tpath: %s\t[%s]', req.path, self)
374
            for key, val in req.headers.items():
238 375
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
239 376
                    continue
240
                sendlog.info('\t%s: %s', key, val)
241
            sendlog.info('')
377
                sendlog.info('\t%s: %s [%s]', key, val, self)
242 378
            if data:
243 379
                datasendlog.info(data)
380
            sendlog.info('END HTTP request commit\t[%s]', self)
244 381

  
382
            r = ResponseManager(req)
245 383
            recvlog.info('%d %s', r.status_code, r.status)
246 384
            for key, val in r.headers.items():
247 385
                if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
......
249 387
                recvlog.info('%s: %s', key, val)
250 388
            if r.content:
251 389
                datarecvlog.info(r.content)
252

  
253
        except (KamakiResponseError, KamakiConnectionError) as err:
254
            from traceback import format_stack
255
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
256
            self.http_client.reset_headers()
257
            self.http_client.reset_params()
258
            errstr = '%s' % err
259
            if not errstr:
260
                errstr = ('%s' % type(err))[7:-2]
261
            status = getattr(err, 'status', getattr(err, 'errno', 0))
262
            raise ClientError('%s\n' % errstr, status=status)
263 390
        finally:
264
            self.http_client.reset_headers()
265
            self.http_client.reset_params()
391
            self.headers = dict()
392
            self.params = dict()
266 393

  
267 394
        if success is not None:
268 395
            # Success can either be an int or a collection
269 396
            success = (success,) if isinstance(success, int) else success
270 397
            if r.status_code not in success:
271
                r.release()
272 398
                self._raise_for_status(r)
273 399
        return r
274 400

  

Also available in: Unified diff