4 # Copyright (C) 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
27 from cStringIO import StringIO
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
34 class HttpClientRequest(object):
35 def __init__(self, host, port, method, path, headers=None, post_data=None,
36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request.
44 @param method: Method name
46 @param path: Request path
47 @type headers: list or None
48 @param headers: Additional headers to send, list of strings
49 @type post_data: string or None
50 @param post_data: Additional data to send
51 @type read_timeout: int
52 @param read_timeout: if passed, it will be used as the read
53 timeout while reading the response from the server
54 @type curl_config_fn: callable
55 @param curl_config_fn: Function to configure cURL object before request
56 (Note: if the function configures the connection in
57 a way where it wouldn't be efficient to reuse them,
58 a "identity" property should be defined, see
59 L{HttpClientRequest.identity})
62 assert path.startswith("/"), "Path must start with slash (/)"
63 assert curl_config_fn is None or callable(curl_config_fn)
70 self.read_timeout = read_timeout
71 self.curl_config_fn = curl_config_fn
76 self.post_data = post_data
80 elif isinstance(headers, dict):
81 # Support for old interface
82 self.headers = ["%s: %s" % (name, value)
83 for name, value in headers.items()]
85 self.headers = headers
92 self.resp_status_code = None
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97 "%s:%s" % (self.host, self.port),
101 return "<%s at %#x>" % (" ".join(status), id(self))
105 """Returns the full URL for this requests.
108 if netutils.IPAddress.IsValid(self.host):
109 address = netutils.FormatAddress((self.host, self.port))
111 address = "%s:%s" % (self.host, self.port)
112 # TODO: Support for non-SSL requests
113 return "https://%s%s" % (address, self.path)
117 """Returns identifier for retrieving a pooled connection for this request.
119 This allows cURL client objects to be re-used and to cache information
120 (e.g. SSL session IDs or connections).
123 parts = [self.host, self.port]
125 if self.curl_config_fn:
127 parts.append(self.curl_config_fn.identity)
128 except AttributeError:
131 return "/".join(str(i) for i in parts)
134 class _HttpClient(object):
135 def __init__(self, curl_config_fn):
136 """Initializes this class.
138 @type curl_config_fn: callable
139 @param curl_config_fn: Function to configure cURL object after
145 curl = self._CreateCurlHandle()
146 curl.setopt(pycurl.VERBOSE, False)
147 curl.setopt(pycurl.NOSIGNAL, True)
148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149 curl.setopt(pycurl.PROXY, "")
151 # Pass cURL object to external config function
158 def _CreateCurlHandle():
159 """Returns a new cURL object.
164 def GetCurlHandle(self):
165 """Returns the cURL object.
170 def GetCurrentRequest(self):
171 """Returns the current request.
173 @rtype: L{HttpClientRequest} or None
178 def StartRequest(self, req):
179 """Starts a request on this client.
181 @type req: L{HttpClientRequest}
182 @param req: HTTP request
185 assert not self._req, "Another request is already started"
188 self._resp_buffer = StringIO()
192 post_data = req.post_data
193 headers = req.headers
195 # PycURL requires strings to be non-unicode
196 assert isinstance(method, str)
197 assert isinstance(url, str)
198 assert isinstance(post_data, str)
199 assert compat.all(isinstance(i, str) for i in headers)
201 # Configure cURL object for request
203 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
204 curl.setopt(pycurl.URL, url)
205 curl.setopt(pycurl.POSTFIELDS, post_data)
206 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
207 curl.setopt(pycurl.HTTPHEADER, headers)
209 if req.read_timeout is None:
210 curl.setopt(pycurl.TIMEOUT, 0)
212 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
214 # Pass cURL object to external config function
215 if req.curl_config_fn:
216 req.curl_config_fn(curl)
218 def Done(self, errmsg):
219 """Finishes a request.
221 @type errmsg: string or None
222 @param errmsg: Error message if request failed
226 assert req, "No request"
228 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
232 req.success = not bool(errmsg)
235 # Get HTTP response code
236 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
237 req.resp_body = self._resp_buffer.getvalue()
239 # Reset client object
241 self._resp_buffer = None
243 # Ensure no potentially large variables are referenced
244 curl.setopt(pycurl.POSTFIELDS, "")
245 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
248 class _PooledHttpClient:
249 """Data structure for HTTP client pool.
252 def __init__(self, identity, client):
253 """Initializes this class.
255 @type identity: string
256 @param identity: Client identifier for pool
257 @type client: L{_HttpClient}
258 @param client: HTTP client
261 self.identity = identity
266 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
267 "id=%s" % self.identity,
268 "lastuse=%s" % self.lastused,
271 return "<%s at %#x>" % (" ".join(status), id(self))
274 class HttpClientPool:
275 """A simple HTTP client pool.
277 Supports one pooled connection per identity (see
278 L{HttpClientRequest.identity}).
281 #: After how many generations to drop unused clients
282 _MAX_GENERATIONS_DROP = 25
284 def __init__(self, curl_config_fn):
285 """Initializes this class.
287 @type curl_config_fn: callable
288 @param curl_config_fn: Function to configure cURL object after
292 self._curl_config_fn = curl_config_fn
297 def _GetHttpClientCreator():
298 """Returns callable to create HTTP client.
303 def _Get(self, identity):
304 """Gets an HTTP client from the pool.
306 @type identity: string
307 @param identity: Client identifier
311 pclient = self._pool.pop(identity)
313 # Need to create new client
314 client = self._GetHttpClientCreator()(self._curl_config_fn)
315 pclient = _PooledHttpClient(identity, client)
316 logging.debug("Created new client %s", pclient)
318 logging.debug("Reusing client %s", pclient)
320 assert pclient.identity == identity
324 def _StartRequest(self, req):
327 @type req: L{HttpClientRequest}
328 @param req: HTTP request
331 logging.debug("Starting request %r", req)
332 pclient = self._Get(req.identity)
334 assert req.identity not in self._pool
336 pclient.client.StartRequest(req)
337 pclient.lastused = self._generation
341 def _Return(self, pclients):
342 """Returns HTTP clients to the pool.
346 logging.debug("Returning client %s to pool", pc)
347 assert pc.identity not in self._pool
348 assert pc not in self._pool.values()
349 self._pool[pc.identity] = pc
351 # Check for unused clients
352 for pc in self._pool.values():
353 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
354 logging.debug("Removing client %s which hasn't been used"
355 " for %s generations",
356 pc, self._MAX_GENERATIONS_DROP)
357 self._pool.pop(pc.identity, None)
359 assert compat.all(pc.lastused >= (self._generation -
360 self._MAX_GENERATIONS_DROP)
361 for pc in self._pool.values())
364 def _CreateCurlMultiHandle():
365 """Creates new cURL multi handle.
368 return pycurl.CurlMulti()
370 def ProcessRequests(self, requests):
371 """Processes any number of HTTP client requests using pooled objects.
373 @type requests: list of L{HttpClientRequest}
374 @param requests: List of all requests
377 multi = self._CreateCurlMultiHandle()
380 self._generation += 1
382 assert compat.all((req.error is None and
383 req.success is None and
384 req.resp_status_code is None and
385 req.resp_body is None)
390 pclient = self._StartRequest(req)
391 curl = pclient.client.GetCurlHandle()
392 curl_to_pclient[curl] = pclient
393 multi.add_handle(curl)
394 assert pclient.client.GetCurrentRequest() == req
395 assert pclient.lastused >= 0
397 assert len(curl_to_pclient) == len(requests)
401 (ret, _) = multi.perform()
402 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
404 if ret == pycurl.E_CALL_MULTI_PERFORM:
405 # cURL wants to be called again
409 (remaining_messages, successful, failed) = multi.info_read()
411 for curl in successful:
412 multi.remove_handle(curl)
414 pclient = curl_to_pclient[curl]
415 req = pclient.client.GetCurrentRequest()
416 pclient.client.Done(None)
418 assert not pclient.client.GetCurrentRequest()
420 for curl, errnum, errmsg in failed:
421 multi.remove_handle(curl)
423 pclient = curl_to_pclient[curl]
424 req = pclient.client.GetCurrentRequest()
425 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
427 assert not pclient.client.GetCurrentRequest()
429 if remaining_messages == 0:
432 assert done_count <= len(requests)
434 if done_count == len(requests):
437 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
438 # timeouts, which are only evaluated in multi.perform, aren't
439 # unnecessarily delayed.
442 assert compat.all(pclient.client.GetCurrentRequest() is None
443 for pclient in curl_to_pclient.values())
445 # Return clients to pool
446 self._Return(curl_to_pclient.values())
448 assert done_count == len(requests)
449 assert compat.all(req.error is not None or
451 req.resp_status_code is not None and
452 req.resp_body is not None)