4 # Copyright (C) 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
27 from cStringIO import StringIO
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
34 class HttpClientRequest(object):
35 def __init__(self, host, port, method, path, headers=None, post_data=None,
36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request.
44 @param method: Method name
46 @param path: Request path
47 @type headers: list or None
48 @param headers: Additional headers to send, list of strings
49 @type post_data: string or None
50 @param post_data: Additional data to send
51 @type read_timeout: int
52 @param read_timeout: if passed, it will be used as the read
53 timeout while reading the response from the server
54 @type curl_config_fn: callable
55 @param curl_config_fn: Function to configure cURL object before request
56 (Note: if the function configures the connection in
57 a way where it wouldn't be efficient to reuse them,
58 a "identity" property should be defined, see
59 L{HttpClientRequest.identity})
62 assert path.startswith("/"), "Path must start with slash (/)"
63 assert curl_config_fn is None or callable(curl_config_fn)
70 self.read_timeout = read_timeout
71 self.curl_config_fn = curl_config_fn
76 self.post_data = post_data
80 elif isinstance(headers, dict):
81 # Support for old interface
82 self.headers = ["%s: %s" % (name, value)
83 for name, value in headers.items()]
85 self.headers = headers
92 self.resp_status_code = None
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97 "%s:%s" % (self.host, self.port),
101 return "<%s at %#x>" % (" ".join(status), id(self))
105 """Returns the full URL for this requests.
108 if netutils.IPAddress.IsValid(self.host):
109 address = netutils.FormatAddress((self.host, self.port))
111 address = "%s:%s" % (self.host, self.port)
112 # TODO: Support for non-SSL requests
113 return "https://%s%s" % (address, self.path)
117 """Returns identifier for retrieving a pooled connection for this request.
119 This allows cURL client objects to be re-used and to cache information
120 (e.g. SSL session IDs or connections).
123 parts = [self.host, self.port]
125 if self.curl_config_fn:
127 parts.append(self.curl_config_fn.identity)
128 except AttributeError:
131 return "/".join(str(i) for i in parts)
134 class _HttpClient(object):
135 def __init__(self, curl_config_fn):
136 """Initializes this class.
138 @type curl_config_fn: callable
139 @param curl_config_fn: Function to configure cURL object after
145 curl = self._CreateCurlHandle()
146 curl.setopt(pycurl.VERBOSE, False)
147 curl.setopt(pycurl.NOSIGNAL, True)
148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149 curl.setopt(pycurl.PROXY, "")
151 # Disable SSL session ID caching (pycurl >= 7.16.0)
152 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
155 # Pass cURL object to external config function
162 def _CreateCurlHandle():
163 """Returns a new cURL object.
168 def GetCurlHandle(self):
169 """Returns the cURL object.
174 def GetCurrentRequest(self):
175 """Returns the current request.
177 @rtype: L{HttpClientRequest} or None
182 def StartRequest(self, req):
183 """Starts a request on this client.
185 @type req: L{HttpClientRequest}
186 @param req: HTTP request
189 assert not self._req, "Another request is already started"
192 self._resp_buffer = StringIO()
196 post_data = req.post_data
197 headers = req.headers
199 # PycURL requires strings to be non-unicode
200 assert isinstance(method, str)
201 assert isinstance(url, str)
202 assert isinstance(post_data, str)
203 assert compat.all(isinstance(i, str) for i in headers)
205 # Configure cURL object for request
207 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
208 curl.setopt(pycurl.URL, url)
209 curl.setopt(pycurl.POSTFIELDS, post_data)
210 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
211 curl.setopt(pycurl.HTTPHEADER, headers)
213 if req.read_timeout is None:
214 curl.setopt(pycurl.TIMEOUT, 0)
216 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
218 # Pass cURL object to external config function
219 if req.curl_config_fn:
220 req.curl_config_fn(curl)
222 def Done(self, errmsg):
223 """Finishes a request.
225 @type errmsg: string or None
226 @param errmsg: Error message if request failed
230 assert req, "No request"
232 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
236 req.success = not bool(errmsg)
239 # Get HTTP response code
240 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
241 req.resp_body = self._resp_buffer.getvalue()
243 # Reset client object
245 self._resp_buffer = None
247 # Ensure no potentially large variables are referenced
248 curl.setopt(pycurl.POSTFIELDS, "")
249 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252 class _PooledHttpClient:
253 """Data structure for HTTP client pool.
256 def __init__(self, identity, client):
257 """Initializes this class.
259 @type identity: string
260 @param identity: Client identifier for pool
261 @type client: L{_HttpClient}
262 @param client: HTTP client
265 self.identity = identity
270 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
271 "id=%s" % self.identity,
272 "lastuse=%s" % self.lastused,
275 return "<%s at %#x>" % (" ".join(status), id(self))
278 class HttpClientPool:
279 """A simple HTTP client pool.
281 Supports one pooled connection per identity (see
282 L{HttpClientRequest.identity}).
285 #: After how many generations to drop unused clients
286 _MAX_GENERATIONS_DROP = 25
288 def __init__(self, curl_config_fn):
289 """Initializes this class.
291 @type curl_config_fn: callable
292 @param curl_config_fn: Function to configure cURL object after
296 self._curl_config_fn = curl_config_fn
301 def _GetHttpClientCreator():
302 """Returns callable to create HTTP client.
307 def _Get(self, identity):
308 """Gets an HTTP client from the pool.
310 @type identity: string
311 @param identity: Client identifier
315 pclient = self._pool.pop(identity)
317 # Need to create new client
318 client = self._GetHttpClientCreator()(self._curl_config_fn)
319 pclient = _PooledHttpClient(identity, client)
320 logging.debug("Created new client %s", pclient)
322 logging.debug("Reusing client %s", pclient)
324 assert pclient.identity == identity
328 def _StartRequest(self, req):
331 @type req: L{HttpClientRequest}
332 @param req: HTTP request
335 logging.debug("Starting request %r", req)
336 pclient = self._Get(req.identity)
338 assert req.identity not in self._pool
340 pclient.client.StartRequest(req)
341 pclient.lastused = self._generation
345 def _Return(self, pclients):
346 """Returns HTTP clients to the pool.
350 logging.debug("Returning client %s to pool", pc)
351 assert pc.identity not in self._pool
352 assert pc not in self._pool.values()
353 self._pool[pc.identity] = pc
355 # Check for unused clients
356 for pc in self._pool.values():
357 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
358 logging.debug("Removing client %s which hasn't been used"
359 " for %s generations",
360 pc, self._MAX_GENERATIONS_DROP)
361 self._pool.pop(pc.identity, None)
363 assert compat.all(pc.lastused >= (self._generation -
364 self._MAX_GENERATIONS_DROP)
365 for pc in self._pool.values())
368 def _CreateCurlMultiHandle():
369 """Creates new cURL multi handle.
372 return pycurl.CurlMulti()
374 def ProcessRequests(self, requests):
375 """Processes any number of HTTP client requests using pooled objects.
377 @type requests: list of L{HttpClientRequest}
378 @param requests: List of all requests
381 multi = self._CreateCurlMultiHandle()
384 self._generation += 1
386 assert compat.all((req.error is None and
387 req.success is None and
388 req.resp_status_code is None and
389 req.resp_body is None)
394 pclient = self._StartRequest(req)
395 curl = pclient.client.GetCurlHandle()
396 curl_to_pclient[curl] = pclient
397 multi.add_handle(curl)
398 assert pclient.client.GetCurrentRequest() == req
399 assert pclient.lastused >= 0
401 assert len(curl_to_pclient) == len(requests)
405 (ret, _) = multi.perform()
406 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
408 if ret == pycurl.E_CALL_MULTI_PERFORM:
409 # cURL wants to be called again
413 (remaining_messages, successful, failed) = multi.info_read()
415 for curl in successful:
416 multi.remove_handle(curl)
418 pclient = curl_to_pclient[curl]
419 req = pclient.client.GetCurrentRequest()
420 pclient.client.Done(None)
422 assert not pclient.client.GetCurrentRequest()
424 for curl, errnum, errmsg in failed:
425 multi.remove_handle(curl)
427 pclient = curl_to_pclient[curl]
428 req = pclient.client.GetCurrentRequest()
429 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
431 assert not pclient.client.GetCurrentRequest()
433 if remaining_messages == 0:
436 assert done_count <= len(requests)
438 if done_count == len(requests):
441 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
442 # timeouts, which are only evaluated in multi.perform, aren't
443 # unnecessarily delayed.
446 assert compat.all(pclient.client.GetCurrentRequest() is None
447 for pclient in curl_to_pclient.values())
449 # Return clients to pool
450 self._Return(curl_to_pclient.values())
452 assert done_count == len(requests)
453 assert compat.all(req.error is not None or
455 req.resp_status_code is not None and
456 req.resp_body is not None)