4 # Copyright (C) 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
27 from cStringIO import StringIO
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
34 class HttpClientRequest(object):
35 def __init__(self, host, port, method, path, headers=None, post_data=None,
36 read_timeout=None, curl_config_fn=None):
37 """Describes an HTTP request.
44 @param method: Method name
46 @param path: Request path
47 @type headers: list or None
48 @param headers: Additional headers to send, list of strings
49 @type post_data: string or None
50 @param post_data: Additional data to send
51 @type read_timeout: int
52 @param read_timeout: if passed, it will be used as the read
53 timeout while reading the response from the server
54 @type curl_config_fn: callable
55 @param curl_config_fn: Function to configure cURL object before request
56 (Note: if the function configures the connection in
57 a way where it wouldn't be efficient to reuse them,
58 a "identity" property should be defined, see
59 L{HttpClientRequest.identity})
62 assert path.startswith("/"), "Path must start with slash (/)"
63 assert curl_config_fn is None or callable(curl_config_fn)
70 self.read_timeout = read_timeout
71 self.curl_config_fn = curl_config_fn
76 self.post_data = post_data
80 elif isinstance(headers, dict):
81 # Support for old interface
82 self.headers = ["%s: %s" % (name, value)
83 for name, value in headers.items()]
85 self.headers = headers
92 self.resp_status_code = None
96 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97 "%s:%s" % (self.host, self.port),
101 return "<%s at %#x>" % (" ".join(status), id(self))
105 """Returns the full URL for this requests.
108 if netutils.IPAddress.IsValid(self.host):
109 address = netutils.FormatAddress((self.host, self.port))
111 address = "%s:%s" % (self.host, self.port)
112 # TODO: Support for non-SSL requests
113 return "https://%s%s" % (address, self.path)
117 """Returns identifier for retrieving a pooled connection for this request.
119 This allows cURL client objects to be re-used and to cache information
120 (e.g. SSL session IDs or connections).
123 parts = [self.host, self.port]
125 if self.curl_config_fn:
127 parts.append(self.curl_config_fn.identity)
128 except AttributeError:
131 return "/".join(str(i) for i in parts)
134 class _HttpClient(object):
135 def __init__(self, curl_config_fn):
136 """Initializes this class.
138 @type curl_config_fn: callable
139 @param curl_config_fn: Function to configure cURL object after
145 curl = self._CreateCurlHandle()
146 curl.setopt(pycurl.VERBOSE, False)
147 curl.setopt(pycurl.NOSIGNAL, True)
148 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149 curl.setopt(pycurl.PROXY, "")
151 # Disable SSL session ID caching (pycurl >= 7.16.0)
152 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
155 # Pass cURL object to external config function
162 def _CreateCurlHandle():
163 """Returns a new cURL object.
168 def GetCurlHandle(self):
169 """Returns the cURL object.
174 def GetCurrentRequest(self):
175 """Returns the current request.
177 @rtype: L{HttpClientRequest} or None
182 def StartRequest(self, req):
183 """Starts a request on this client.
185 @type req: L{HttpClientRequest}
186 @param req: HTTP request
189 assert not self._req, "Another request is already started"
191 logging.debug("Starting request %r", req)
194 self._resp_buffer = StringIO()
198 post_data = req.post_data
199 headers = req.headers
201 # PycURL requires strings to be non-unicode
202 assert isinstance(method, str)
203 assert isinstance(url, str)
204 assert isinstance(post_data, str)
205 assert compat.all(isinstance(i, str) for i in headers)
207 # Configure cURL object for request
209 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
210 curl.setopt(pycurl.URL, url)
211 curl.setopt(pycurl.POSTFIELDS, post_data)
212 curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
213 curl.setopt(pycurl.HTTPHEADER, headers)
215 if req.read_timeout is None:
216 curl.setopt(pycurl.TIMEOUT, 0)
218 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
220 # Pass cURL object to external config function
221 if req.curl_config_fn:
222 req.curl_config_fn(curl)
224 def Done(self, errmsg):
225 """Finishes a request.
227 @type errmsg: string or None
228 @param errmsg: Error message if request failed
232 assert req, "No request"
234 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
238 req.success = not bool(errmsg)
241 # Get HTTP response code
242 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243 req.resp_body = self._resp_buffer.getvalue()
245 # Reset client object
247 self._resp_buffer = None
249 # Ensure no potentially large variables are referenced
250 curl.setopt(pycurl.POSTFIELDS, "")
251 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
254 class _PooledHttpClient:
255 """Data structure for HTTP client pool.
258 def __init__(self, identity, client):
259 """Initializes this class.
261 @type identity: string
262 @param identity: Client identifier for pool
263 @type client: L{_HttpClient}
264 @param client: HTTP client
267 self.identity = identity
272 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273 "id=%s" % self.identity,
274 "lastuse=%s" % self.lastused,
277 return "<%s at %#x>" % (" ".join(status), id(self))
280 class HttpClientPool:
281 """A simple HTTP client pool.
283 Supports one pooled connection per identity (see
284 L{HttpClientRequest.identity}).
287 #: After how many generations to drop unused clients
288 _MAX_GENERATIONS_DROP = 25
290 def __init__(self, curl_config_fn):
291 """Initializes this class.
293 @type curl_config_fn: callable
294 @param curl_config_fn: Function to configure cURL object after
298 self._curl_config_fn = curl_config_fn
302 # Create custom logger for HTTP client pool. Change logging level to
303 # C{logging.NOTSET} to get more details.
304 self._logger = logging.getLogger(self.__class__.__name__)
305 self._logger.setLevel(logging.INFO)
308 def _GetHttpClientCreator():
309 """Returns callable to create HTTP client.
314 def _Get(self, identity):
315 """Gets an HTTP client from the pool.
317 @type identity: string
318 @param identity: Client identifier
322 pclient = self._pool.pop(identity)
324 # Need to create new client
325 client = self._GetHttpClientCreator()(self._curl_config_fn)
326 pclient = _PooledHttpClient(identity, client)
327 self._logger.debug("Created new client %s", pclient)
329 self._logger.debug("Reusing client %s", pclient)
331 assert pclient.identity == identity
335 def _StartRequest(self, req):
338 @type req: L{HttpClientRequest}
339 @param req: HTTP request
342 pclient = self._Get(req.identity)
344 assert req.identity not in self._pool
346 pclient.client.StartRequest(req)
347 pclient.lastused = self._generation
351 def _Return(self, pclients):
352 """Returns HTTP clients to the pool.
356 self._logger.debug("Returning client %s to pool", pc)
357 assert pc.identity not in self._pool
358 assert pc not in self._pool.values()
359 self._pool[pc.identity] = pc
361 # Check for unused clients
362 for pc in self._pool.values():
363 if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
364 self._logger.debug("Removing client %s which hasn't been used"
365 " for %s generations",
366 pc, self._MAX_GENERATIONS_DROP)
367 self._pool.pop(pc.identity, None)
369 assert compat.all(pc.lastused >= (self._generation -
370 self._MAX_GENERATIONS_DROP)
371 for pc in self._pool.values())
374 def _CreateCurlMultiHandle():
375 """Creates new cURL multi handle.
378 return pycurl.CurlMulti()
380 def ProcessRequests(self, requests):
381 """Processes any number of HTTP client requests using pooled objects.
383 @type requests: list of L{HttpClientRequest}
384 @param requests: List of all requests
387 multi = self._CreateCurlMultiHandle()
390 self._generation += 1
392 assert compat.all((req.error is None and
393 req.success is None and
394 req.resp_status_code is None and
395 req.resp_body is None)
400 pclient = self._StartRequest(req)
401 curl = pclient.client.GetCurlHandle()
402 curl_to_pclient[curl] = pclient
403 multi.add_handle(curl)
404 assert pclient.client.GetCurrentRequest() == req
405 assert pclient.lastused >= 0
407 assert len(curl_to_pclient) == len(requests)
411 (ret, _) = multi.perform()
412 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
414 if ret == pycurl.E_CALL_MULTI_PERFORM:
415 # cURL wants to be called again
419 (remaining_messages, successful, failed) = multi.info_read()
421 for curl in successful:
422 multi.remove_handle(curl)
424 pclient = curl_to_pclient[curl]
425 req = pclient.client.GetCurrentRequest()
426 pclient.client.Done(None)
428 assert not pclient.client.GetCurrentRequest()
430 for curl, errnum, errmsg in failed:
431 multi.remove_handle(curl)
433 pclient = curl_to_pclient[curl]
434 req = pclient.client.GetCurrentRequest()
435 pclient.client.Done("Error %s: %s" % (errnum, errmsg))
437 assert not pclient.client.GetCurrentRequest()
439 if remaining_messages == 0:
442 assert done_count <= len(requests)
444 if done_count == len(requests):
447 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
448 # timeouts, which are only evaluated in multi.perform, aren't
449 # unnecessarily delayed.
452 assert compat.all(pclient.client.GetCurrentRequest() is None
453 for pclient in curl_to_pclient.values())
455 # Return clients to pool
456 self._Return(curl_to_pclient.values())
458 assert done_count == len(requests)
459 assert compat.all(req.error is not None or
461 req.resp_status_code is not None and
462 req.resp_body is not None)