Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / connection / kamakicon.py @ bcd6c6e6

History | View | Annotate | Download (7 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urlparse import urlparse
35
from urllib2 import quote
36
from objpool.http import get_http_connection
37
from traceback import format_stack
38

    
39
from kamaki.clients.connection import KamakiConnection, KamakiResponse
40
from kamaki.clients.connection.errors import KamakiConnectionError
41
from kamaki.clients.connection.errors import KamakiResponseError
42

    
43
from json import loads
44

    
45
from time import sleep
46
from httplib import ResponseNotReady
47

    
48

    
49
class KamakiHTTPResponse(KamakiResponse):
50

    
51
    def _get_response(self):
52
        if self.prefetched:
53
            return
54

    
55
        try:
56
            while True:
57
                try:
58
                    r = self.request.getresponse()
59
                except ResponseNotReady:
60
                    sleep(0.001)
61
                else:
62
                    break
63
            self.prefetched = True
64
            headers = {}
65
            for k, v in r.getheaders():
66
                headers.update({k: v})
67
            self.headers = headers
68
            self.content = r.read()
69
            self.status_code = r.status
70
            self.status = r.reason
71
        finally:
72
            try:
73
                self.request.close()
74
            except Exception as err:
75
                from kamaki.clients import recvlog
76
                recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
77
                raise
78

    
79
    @property
80
    def text(self):
81
        """
82
        :returns: (str) content
83
        """
84
        self._get_response()
85
        return '%s' % self._content
86

    
87
    @text.setter
88
    def text(self, v):
89
        pass
90

    
91
    @property
92
    def json(self):
93
        """
94
        :returns: (dict) the json-formated content
95

96
        :raises KamakiResponseError: if content is not json formated
97
        """
98
        self._get_response()
99
        try:
100
            return loads(self._content)
101
        except ValueError as err:
102
            KamakiResponseError('Response not formated in JSON - %s' % err)
103

    
104
    @json.setter
105
    def json(self, v):
106
        pass
107

    
108
    def release(self):
109
        """ Release the connection. Should always be called if the response
110
        content hasn't been used.
111
        """
112
        if not self.prefetched:
113
            try:
114
                self.request.close()
115
            except Exception as err:
116
                from kamaki.clients import recvlog
117
                recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
118
                raise
119

    
120

    
121
class KamakiHTTPConnection(KamakiConnection):
122

    
123
    def _retrieve_connection_info(self, extra_params={}):
124
        """ Set self.url to scheme://netloc/?params
125
        :param extra_params: (dict) key:val for url parameters
126

127
        :returns: (scheme, netloc)
128
        """
129
        if self.url:
130
            url = self.url if self.url[-1] == '/' else (self.url + '/')
131
        else:
132
            url = 'http://127.0.0.1'
133
        if self.path:
134
            url += self.path[1:] if self.path[0] == '/' else self.path
135
        params = dict(self.params)
136
        params.update(extra_params)
137
        for i, (key, val) in enumerate(params.items()):
138
            url += '%s%s' % ('&' if i else '?', key)
139
            if val:
140
                url += '=%s' % val
141

    
142
        parsed = urlparse(url)
143
        self.url = url
144
        self.path = parsed.path or '/'
145
        if parsed.query:
146
            self.path += '?%s' % parsed.query
147
        return (parsed.scheme, parsed.netloc)
148

    
149
    def perform_request(
150
            self,
151
            method=None, data=None, async_headers={}, async_params={}):
152
        """
153
        :param method: (str) http method ('get', 'post', etc.)
154

155
        :param data: (binary object)
156

157
        :param async_headers: (dict) key:val headers that are used only for one
158
            request instance as opposed to self.headers, which remain to be
159
            used by following or parallel requests
160

161
        :param async_params: (dict) key:val url parameters that are used only
162
            for one request instance as opposed to self.params, which remain to
163
            be used by following or parallel requests
164

165
        :returns: (KamakiHTTPResponse) a response object
166

167
        :raises KamakiConnectionError: Connection failures
168
        """
169
        (scheme, netloc) = self._retrieve_connection_info(async_params)
170
        headers = dict(self.headers)
171
        for k, v in async_headers.items():
172
            v = quote(v.encode('utf-8')) if isinstance(v, unicode) else str(v)
173
            headers[k] = v
174

    
175
        #de-unicode headers to prepare them for http
176
        http_headers = {}
177
        for k, v in headers.items():
178
            v = quote(v.encode('utf-8')) if isinstance(v, unicode) else str(v)
179
            http_headers[k] = v
180

    
181
        #get connection from pool
182
        try:
183
            conn = get_http_connection(
184
                netloc=netloc,
185
                scheme=scheme,
186
                pool_size=self.poolsize)
187
        except ValueError as ve:
188
            raise KamakiConnectionError(
189
                'Cannot establish connection to %s %s' % (self.url, ve),
190
                errno=-1)
191
        try:
192
            #Be carefull, all non-body variables should not be unicode
193
            conn.request(
194
                method=str(method.upper()),
195
                url=str(self.path),
196
                headers=http_headers,
197
                body=data)
198
        except IOError as ioe:
199
            raise KamakiConnectionError(
200
                'Cannot connect to %s: %s' % (self.url, ioe.strerror),
201
                errno=ioe.errno)
202
        except Exception as err:
203
            from kamaki.clients import recvlog
204
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
205
            conn.close()
206
            raise
207
        return KamakiHTTPResponse(conn)