#
#
-# Copyright (C) 2007, 2008 Google Inc.
+# Copyright (C) 2007, 2008, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
-# pylint: disable-msg=E1103
+import logging
+import pycurl
+from cStringIO import StringIO
-# # E1103: %s %r has no %r member (but some types could not be
-# inferred), since _socketobject could be ssl or not and pylint
-# doesn't parse that
-
-
-import os
-import select
-import socket
-import errno
-import threading
-
-from ganeti import workerpool
from ganeti import http
-
-
-HTTP_CLIENT_THREADS = 10
+from ganeti import compat
+from ganeti import netutils
class HttpClientRequest(object):
def __init__(self, host, port, method, path, headers=None, post_data=None,
- ssl_params=None, ssl_verify_peer=False):
+ read_timeout=None, curl_config_fn=None):
"""Describes an HTTP request.
@type host: string
@param method: Method name
@type path: string
@param path: Request path
- @type headers: dict or None
- @param headers: Additional headers to send
+ @type headers: list or None
+ @param headers: Additional headers to send, list of strings
@type post_data: string or None
@param post_data: Additional data to send
- @type ssl_params: HttpSslParams
- @param ssl_params: SSL key and certificate
- @type ssl_verify_peer: bool
- @param ssl_verify_peer: Whether to compare our certificate with
- server's certificate
+ @type read_timeout: int
+ @param read_timeout: if passed, it will be used as the read
+ timeout while reading the response from the server
+ @type curl_config_fn: callable
+ @param curl_config_fn: Function to configure cURL object before request
+ (Note: if the function configures the connection in
+ a way where it wouldn't be efficient to reuse them,
+ a "identity" property should be defined, see
+ L{HttpClientRequest.identity})
"""
- if post_data is not None:
- assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
- "Only POST and GET requests support sending data"
-
assert path.startswith("/"), "Path must start with slash (/)"
+ assert curl_config_fn is None or callable(curl_config_fn)
# Request attributes
self.host = host
self.port = port
- self.ssl_params = ssl_params
- self.ssl_verify_peer = ssl_verify_peer
self.method = method
self.path = path
- self.headers = headers
- self.post_data = post_data
-
+ self.read_timeout = read_timeout
+ self.curl_config_fn = curl_config_fn
+
+ if post_data is None:
+ self.post_data = ""
+ else:
+ self.post_data = post_data
+
+ if headers is None:
+ self.headers = []
+ elif isinstance(headers, dict):
+ # Support for old interface
+ self.headers = ["%s: %s" % (name, value)
+ for name, value in headers.items()]
+ else:
+ self.headers = headers
+
+ # Response status
self.success = None
self.error = None
- # Raw response
- self.response = None
-
# Response attributes
- self.resp_version = None
self.resp_status_code = None
- self.resp_reason = None
- self.resp_headers = None
self.resp_body = None
def __repr__(self):
return "<%s at %#x>" % (" ".join(status), id(self))
+ @property
+ def url(self):
+ """Returns the full URL for this requests.
-class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
- pass
-
+ """
+ if netutils.IPAddress.IsValid(self.host):
+ address = netutils.FormatAddress((self.host, self.port))
+ else:
+ address = "%s:%s" % (self.host, self.port)
+ # TODO: Support for non-SSL requests
+ return "https://%s%s" % (address, self.path)
-class _HttpServerToClientMessageReader(http.HttpMessageReader):
- # Length limits
- START_LINE_LENGTH_MAX = 512
- HEADER_LENGTH_MAX = 4096
+ @property
+ def identity(self):
+ """Returns identifier for retrieving a pooled connection for this request.
- def ParseStartLine(self, start_line):
- """Parses the status line sent by the server.
+ This allows cURL client objects to be re-used and to cache information
+ (e.g. SSL session IDs or connections).
"""
- # Empty lines are skipped when reading
- assert start_line
+ parts = [self.host, self.port]
- try:
- [version, status, reason] = start_line.split(None, 2)
- except ValueError:
+ if self.curl_config_fn:
try:
- [version, status] = start_line.split(None, 1)
- reason = ""
- except ValueError:
- version = http.HTTP_0_9
+ parts.append(self.curl_config_fn.identity)
+ except AttributeError:
+ pass
- if version:
- version = version.upper()
+ return "/".join(str(i) for i in parts)
- # The status code is a three-digit number
- try:
- status = int(status)
- if status < 100 or status > 999:
- status = -1
- except (TypeError, ValueError):
- status = -1
- if status == -1:
- raise http.HttpError("Invalid status code (%r)" % start_line)
+class _HttpClient(object):
+ def __init__(self, curl_config_fn):
+ """Initializes this class.
+
+ @type curl_config_fn: callable
+ @param curl_config_fn: Function to configure cURL object after
+ initialization
+
+ """
+ self._req = None
+
+ curl = self._CreateCurlHandle()
+ curl.setopt(pycurl.VERBOSE, False)
+ curl.setopt(pycurl.NOSIGNAL, True)
+ curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
+ curl.setopt(pycurl.PROXY, "")
- return http.HttpServerToClientStartLine(version, status, reason)
+ # Disable SSL session ID caching (pycurl >= 7.16.0)
+ if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+ curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
+ # Pass cURL object to external config function
+ if curl_config_fn:
+ curl_config_fn(curl)
-class HttpClientRequestExecutor(http.HttpBase):
- # Default headers
- DEFAULT_HEADERS = {
- http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
- # TODO: For keep-alive, don't send "Connection: close"
- http.HTTP_CONNECTION: "close",
- }
+ self._curl = curl
- # Timeouts in seconds for socket layer
- # TODO: Soft timeout instead of only socket timeout?
- # TODO: Make read timeout configurable per OpCode?
- CONNECT_TIMEOUT = 5
- WRITE_TIMEOUT = 10
- READ_TIMEOUT = None
- CLOSE_TIMEOUT = 1
+ @staticmethod
+ def _CreateCurlHandle():
+ """Returns a new cURL object.
- def __init__(self, req):
- """Initializes the HttpClientRequestExecutor class.
+ """
+ return pycurl.Curl()
- @type req: HttpClientRequest
- @param req: Request object
+ def GetCurlHandle(self):
+ """Returns the cURL object.
"""
- http.HttpBase.__init__(self)
- self.request = req
+ return self._curl
- try:
- # TODO: Implement connection caching/keep-alive
- self.sock = self._CreateSocket(req.ssl_params,
- req.ssl_verify_peer)
+ def GetCurrentRequest(self):
+ """Returns the current request.
- # Disable Python's timeout
- self.sock.settimeout(None)
+ @rtype: L{HttpClientRequest} or None
- # Operate in non-blocking mode
- self.sock.setblocking(0)
+ """
+ return self._req
- response_msg_reader = None
- response_msg = None
- force_close = True
+ def StartRequest(self, req):
+ """Starts a request on this client.
- self._Connect()
- try:
- self._SendRequest()
- (response_msg_reader, response_msg) = self._ReadResponse()
+ @type req: L{HttpClientRequest}
+ @param req: HTTP request
+
+ """
+ assert not self._req, "Another request is already started"
- # Only wait for server to close if we didn't have any exception.
- force_close = False
- finally:
- # TODO: Keep-alive is not supported, always close connection
- force_close = True
- http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
- self.WRITE_TIMEOUT, response_msg_reader,
- force_close)
+ self._req = req
+ self._resp_buffer = StringIO()
- self.sock.close()
- self.sock = None
+ url = req.url
+ method = req.method
+ post_data = req.post_data
+ headers = req.headers
- req.response = response_msg
+ # PycURL requires strings to be non-unicode
+ assert isinstance(method, str)
+ assert isinstance(url, str)
+ assert isinstance(post_data, str)
+ assert compat.all(isinstance(i, str) for i in headers)
- req.resp_version = req.response.start_line.version
- req.resp_status_code = req.response.start_line.code
- req.resp_reason = req.response.start_line.reason
- req.resp_headers = req.response.headers
- req.resp_body = req.response.body
+ # Configure cURL object for request
+ curl = self._curl
+ curl.setopt(pycurl.CUSTOMREQUEST, str(method))
+ curl.setopt(pycurl.URL, url)
+ curl.setopt(pycurl.POSTFIELDS, post_data)
+ curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
+ curl.setopt(pycurl.HTTPHEADER, headers)
- req.success = True
- req.error = None
+ if req.read_timeout is None:
+ curl.setopt(pycurl.TIMEOUT, 0)
+ else:
+ curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
- except http.HttpError, err:
- req.success = False
- req.error = str(err)
+ # Pass cURL object to external config function
+ if req.curl_config_fn:
+ req.curl_config_fn(curl)
- def _Connect(self):
- """Non-blocking connect to host with timeout.
+ def Done(self, errmsg):
+ """Finishes a request.
+
+ @type errmsg: string or None
+ @param errmsg: Error message if request failed
"""
- connected = False
- while True:
- try:
- connect_error = self.sock.connect_ex((self.request.host,
- self.request.port))
- except socket.gaierror, err:
- raise http.HttpError("Connection failed: %s" % str(err))
+ req = self._req
+ assert req, "No request"
- if connect_error == errno.EINTR:
- # Mask signals
- pass
+ logging.debug("Request %s finished, errmsg=%s", req, errmsg)
- elif connect_error == 0:
- # Connection established
- connected = True
- break
+ curl = self._curl
- elif connect_error == errno.EINPROGRESS:
- # Connection started
- break
+ req.success = not bool(errmsg)
+ req.error = errmsg
- raise http.HttpError("Connection failed (%s: %s)" %
- (connect_error, os.strerror(connect_error)))
+ # Get HTTP response code
+ req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
+ req.resp_body = self._resp_buffer.getvalue()
- if not connected:
- # Wait for connection
- event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
- self.CONNECT_TIMEOUT)
- if event is None:
- raise http.HttpError("Timeout while connecting to server")
+ # Reset client object
+ self._req = None
+ self._resp_buffer = None
- # Get error code
- connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if connect_error != 0:
- raise http.HttpError("Connection failed (%s: %s)" %
- (connect_error, os.strerror(connect_error)))
+ # Ensure no potentially large variables are referenced
+ curl.setopt(pycurl.POSTFIELDS, "")
+ curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
- # Enable TCP keep-alive
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- # If needed, Linux specific options are available to change the TCP
- # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
- # TCP_KEEPINTVL.
+class _PooledHttpClient:
+ """Data structure for HTTP client pool.
- # Do the secret SSL handshake
- if self.using_ssl:
- self.sock.set_connect_state() # pylint: disable-msg=E1103
- try:
- http.Handshake(self.sock, self.WRITE_TIMEOUT)
- except http.HttpSessionHandshakeUnexpectedEOF:
- raise http.HttpError("Server closed connection during SSL handshake")
+ """
+ def __init__(self, identity, client):
+ """Initializes this class.
- def _SendRequest(self):
- """Sends request to server.
+ @type identity: string
+ @param identity: Client identifier for pool
+ @type client: L{_HttpClient}
+ @param client: HTTP client
"""
- # Headers
- send_headers = self.DEFAULT_HEADERS.copy()
+ self.identity = identity
+ self.client = client
+ self.lastused = 0
- if self.request.headers:
- send_headers.update(self.request.headers)
+ def __repr__(self):
+ status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+ "id=%s" % self.identity,
+ "lastuse=%s" % self.lastused,
+ repr(self.client)]
- send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
- self.request.port)
+ return "<%s at %#x>" % (" ".join(status), id(self))
- # Response message
- msg = http.HttpMessage()
- # Combine request line. We only support HTTP/1.0 (no chunked transfers and
- # no keep-alive).
- # TODO: For keep-alive, change to HTTP/1.1
- msg.start_line = \
- http.HttpClientToServerStartLine(method=self.request.method.upper(),
- path=self.request.path,
- version=http.HTTP_1_0)
- msg.headers = send_headers
- msg.body = self.request.post_data
+class HttpClientPool:
+ """A simple HTTP client pool.
- try:
- _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
- except http.HttpSocketTimeout:
- raise http.HttpError("Timeout while sending request")
- except socket.error, err:
- raise http.HttpError("Error sending request: %s" % err)
+ Supports one pooled connection per identity (see
+ L{HttpClientRequest.identity}).
- def _ReadResponse(self):
- """Read response from server.
+ """
+ #: After how many generations to drop unused clients
+ _MAX_GENERATIONS_DROP = 25
- """
- response_msg = http.HttpMessage()
+ def __init__(self, curl_config_fn):
+ """Initializes this class.
- try:
- response_msg_reader = \
- _HttpServerToClientMessageReader(self.sock, response_msg,
- self.READ_TIMEOUT)
- except http.HttpSocketTimeout:
- raise http.HttpError("Timeout while reading response")
- except socket.error, err:
- raise http.HttpError("Error reading response: %s" % err)
+ @type curl_config_fn: callable
+ @param curl_config_fn: Function to configure cURL object after
+ initialization
- return (response_msg_reader, response_msg)
+ """
+ self._curl_config_fn = curl_config_fn
+ self._generation = 0
+ self._pool = {}
+ @staticmethod
+ def _GetHttpClientCreator():
+ """Returns callable to create HTTP client.
-class _HttpClientPendingRequest(object):
- """Data class for pending requests.
+ """
+ return _HttpClient
- """
- def __init__(self, request):
- self.request = request
+ def _Get(self, identity):
+ """Gets an HTTP client from the pool.
- # Thread synchronization
- self.done = threading.Event()
+ @type identity: string
+ @param identity: Client identifier
- def __repr__(self):
- status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
- "req=%r" % self.request]
+ """
+ try:
+ pclient = self._pool.pop(identity)
+ except KeyError:
+ # Need to create new client
+ client = self._GetHttpClientCreator()(self._curl_config_fn)
+ pclient = _PooledHttpClient(identity, client)
+ logging.debug("Created new client %s", pclient)
+ else:
+ logging.debug("Reusing client %s", pclient)
- return "<%s at %#x>" % (" ".join(status), id(self))
+ assert pclient.identity == identity
+ return pclient
-class HttpClientWorker(workerpool.BaseWorker):
- """HTTP client worker class.
+ def _StartRequest(self, req):
+ """Starts a request.
- """
- def RunTask(self, pend_req): # pylint: disable-msg=W0221
- try:
- HttpClientRequestExecutor(pend_req.request)
- finally:
- pend_req.done.set()
+ @type req: L{HttpClientRequest}
+ @param req: HTTP request
+ """
+ logging.debug("Starting request %r", req)
+ pclient = self._Get(req.identity)
-class HttpClientWorkerPool(workerpool.WorkerPool):
- def __init__(self, manager):
- workerpool.WorkerPool.__init__(self, "HttpClient",
- HTTP_CLIENT_THREADS,
- HttpClientWorker)
- self.manager = manager
+ assert req.identity not in self._pool
+ pclient.client.StartRequest(req)
+ pclient.lastused = self._generation
-class HttpClientManager(object):
- """Manages HTTP requests.
+ return pclient
- """
- def __init__(self):
- self._wpool = HttpClientWorkerPool(self)
+ def _Return(self, pclients):
+ """Returns HTTP clients to the pool.
- def __del__(self):
- self.Shutdown()
+ """
+ for pc in pclients:
+ logging.debug("Returning client %s to pool", pc)
+ assert pc.identity not in self._pool
+ assert pc not in self._pool.values()
+ self._pool[pc.identity] = pc
+
+ # Check for unused clients
+ for pc in self._pool.values():
+ if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
+ logging.debug("Removing client %s which hasn't been used"
+ " for %s generations",
+ pc, self._MAX_GENERATIONS_DROP)
+ self._pool.pop(pc.identity, None)
+
+ assert compat.all(pc.lastused >= (self._generation -
+ self._MAX_GENERATIONS_DROP)
+ for pc in self._pool.values())
+
+ @staticmethod
+ def _CreateCurlMultiHandle():
+ """Creates new cURL multi handle.
- def ExecRequests(self, requests):
- """Execute HTTP requests.
+ """
+ return pycurl.CurlMulti()
- This function can be called from multiple threads at the same time.
+ def ProcessRequests(self, requests):
+ """Processes any number of HTTP client requests using pooled objects.
- @type requests: List of HttpClientRequest instances
- @param requests: The requests to execute
- @rtype: List of HttpClientRequest instances
- @return: The list of requests passed in
+ @type requests: list of L{HttpClientRequest}
+ @param requests: List of all requests
"""
- # _HttpClientPendingRequest is used for internal thread synchronization
- pending = [_HttpClientPendingRequest(req) for req in requests]
+ multi = self._CreateCurlMultiHandle()
- try:
- # Add requests to queue
- for pend_req in pending:
- self._wpool.AddTask(pend_req)
+ # For client cleanup
+ self._generation += 1
+
+ assert compat.all((req.error is None and
+ req.success is None and
+ req.resp_status_code is None and
+ req.resp_body is None)
+ for req in requests)
+
+ curl_to_pclient = {}
+ for req in requests:
+ pclient = self._StartRequest(req)
+ curl = pclient.client.GetCurlHandle()
+ curl_to_pclient[curl] = pclient
+ multi.add_handle(curl)
+ assert pclient.client.GetCurrentRequest() == req
+ assert pclient.lastused >= 0
+
+ assert len(curl_to_pclient) == len(requests)
+
+ done_count = 0
+ while True:
+ (ret, _) = multi.perform()
+ assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
+
+ if ret == pycurl.E_CALL_MULTI_PERFORM:
+ # cURL wants to be called again
+ continue
+
+ while True:
+ (remaining_messages, successful, failed) = multi.info_read()
+
+ for curl in successful:
+ multi.remove_handle(curl)
+ done_count += 1
+ pclient = curl_to_pclient[curl]
+ req = pclient.client.GetCurrentRequest()
+ pclient.client.Done(None)
+ assert req.success
+ assert not pclient.client.GetCurrentRequest()
+
+ for curl, errnum, errmsg in failed:
+ multi.remove_handle(curl)
+ done_count += 1
+ pclient = curl_to_pclient[curl]
+ req = pclient.client.GetCurrentRequest()
+ pclient.client.Done("Error %s: %s" % (errnum, errmsg))
+ assert req.error
+ assert not pclient.client.GetCurrentRequest()
+
+ if remaining_messages == 0:
+ break
+
+ assert done_count <= len(requests)
+
+ if done_count == len(requests):
+ break
- finally:
- # In case of an exception we should still wait for the rest, otherwise
- # another thread from the worker pool could modify the request object
- # after we returned.
+ # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
+ # timeouts, which are only evaluated in multi.perform, aren't
+ # unnecessarily delayed.
+ multi.select(1.0)
- # And wait for them to finish
- for pend_req in pending:
- pend_req.done.wait()
+ assert compat.all(pclient.client.GetCurrentRequest() is None
+ for pclient in curl_to_pclient.values())
- # Return original list
- return requests
+ # Return clients to pool
+ self._Return(curl_to_pclient.values())
- def Shutdown(self):
- self._wpool.Quiesce()
- self._wpool.TerminateWorkers()
+ assert done_count == len(requests)
+ assert compat.all(req.error is not None or
+ (req.success and
+ req.resp_status_code is not None and
+ req.resp_body is not None)
+ for req in requests)