Cluster verify: verify hypervisor parameters only once
[ganeti-local] / lib / http / client.py
index 717581f..8cc4744 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2007, 2008 Google Inc.
+# Copyright (C) 2007, 2008, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 """
 
 
 """
 
-import os
-import select
-import socket
-import errno
-import threading
+import logging
+import pycurl
+from cStringIO import StringIO
 
 
-from ganeti import workerpool
 from ganeti import http
 from ganeti import http
-
-
-HTTP_CLIENT_THREADS = 10
+from ganeti import compat
+from ganeti import netutils
 
 
 class HttpClientRequest(object):
   def __init__(self, host, port, method, path, headers=None, post_data=None,
 
 
 class HttpClientRequest(object):
   def __init__(self, host, port, method, path, headers=None, post_data=None,
-               ssl_params=None, ssl_verify_peer=False):
+               read_timeout=None, curl_config_fn=None):
     """Describes an HTTP request.
 
     @type host: string
     """Describes an HTTP request.
 
     @type host: string
@@ -48,339 +44,414 @@ class HttpClientRequest(object):
     @param method: Method name
     @type path: string
     @param path: Request path
     @param method: Method name
     @type path: string
     @param path: Request path
-    @type headers: dict or None
-    @param headers: Additional headers to send
+    @type headers: list or None
+    @param headers: Additional headers to send, list of strings
     @type post_data: string or None
     @param post_data: Additional data to send
     @type post_data: string or None
     @param post_data: Additional data to send
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to compare our certificate with
-        server's certificate
+    @type read_timeout: int
+    @param read_timeout: if passed, it will be used as the read
+        timeout while reading the response from the server
+    @type curl_config_fn: callable
+    @param curl_config_fn: Function to configure cURL object before request
+                           (Note: if the function configures the connection in
+                           a way where it wouldn't be efficient to reuse them,
+                           a "identity" property should be defined, see
+                           L{HttpClientRequest.identity})
 
     """
 
     """
-    if post_data is not None:
-      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
-        "Only POST and GET requests support sending data"
-
     assert path.startswith("/"), "Path must start with slash (/)"
     assert path.startswith("/"), "Path must start with slash (/)"
+    assert curl_config_fn is None or callable(curl_config_fn)
 
     # Request attributes
     self.host = host
     self.port = port
 
     # Request attributes
     self.host = host
     self.port = port
-    self.ssl_params = ssl_params
-    self.ssl_verify_peer = ssl_verify_peer
     self.method = method
     self.path = path
     self.method = method
     self.path = path
-    self.headers = headers
-    self.post_data = post_data
-
+    self.read_timeout = read_timeout
+    self.curl_config_fn = curl_config_fn
+
+    if post_data is None:
+      self.post_data = ""
+    else:
+      self.post_data = post_data
+
+    if headers is None:
+      self.headers = []
+    elif isinstance(headers, dict):
+      # Support for old interface
+      self.headers = ["%s: %s" % (name, value)
+                      for name, value in headers.items()]
+    else:
+      self.headers = headers
+
+    # Response status
     self.success = None
     self.error = None
 
     self.success = None
     self.error = None
 
-    # Raw response
-    self.response = None
-
     # Response attributes
     # Response attributes
-    self.resp_version = None
     self.resp_status_code = None
     self.resp_status_code = None
-    self.resp_reason = None
-    self.resp_headers = None
     self.resp_body = None
 
     self.resp_body = None
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "%s:%s" % (self.host, self.port),
+              self.method,
+              self.path]
+
+    return "<%s at %#x>" % (" ".join(status), id(self))
 
 
-class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
-  pass
+  @property
+  def url(self):
+    """Returns the full URL for this requests.
 
 
+    """
+    if netutils.IPAddress.IsValid(self.host):
+      address = netutils.FormatAddress((self.host, self.port))
+    else:
+      address = "%s:%s" % (self.host, self.port)
+    # TODO: Support for non-SSL requests
+    return "https://%s%s" % (address, self.path)
 
 
-class _HttpServerToClientMessageReader(http.HttpMessageReader):
-  # Length limits
-  START_LINE_LENGTH_MAX = 512
-  HEADER_LENGTH_MAX = 4096
+  @property
+  def identity(self):
+    """Returns identifier for retrieving a pooled connection for this request.
 
 
-  def ParseStartLine(self, start_line):
-    """Parses the status line sent by the server.
+    This allows cURL client objects to be re-used and to cache information
+    (e.g. SSL session IDs or connections).
 
     """
 
     """
-    # Empty lines are skipped when reading
-    assert start_line
+    parts = [self.host, self.port]
 
 
-    try:
-      [version, status, reason] = start_line.split(None, 2)
-    except ValueError:
+    if self.curl_config_fn:
       try:
       try:
-        [version, status] = start_line.split(None, 1)
-        reason = ""
-      except ValueError:
-        version = http.HTTP_0_9
+        parts.append(self.curl_config_fn.identity)
+      except AttributeError:
+        pass
 
 
-    if version:
-      version = version.upper()
+    return "/".join(str(i) for i in parts)
 
 
-    # The status code is a three-digit number
-    try:
-      status = int(status)
-      if status < 100 or status > 999:
-        status = -1
-    except ValueError:
-      status = -1
 
 
-    if status == -1:
-      raise http.HttpError("Invalid status code (%r)" % start_line)
+class _HttpClient(object):
+  def __init__(self, curl_config_fn):
+    """Initializes this class.
 
 
-    return http.HttpServerToClientStartLine(version, status, reason)
+    @type curl_config_fn: callable
+    @param curl_config_fn: Function to configure cURL object after
+                           initialization
 
 
+    """
+    self._req = None
+
+    curl = self._CreateCurlHandle()
+    curl.setopt(pycurl.VERBOSE, False)
+    curl.setopt(pycurl.NOSIGNAL, True)
+    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
+    curl.setopt(pycurl.PROXY, "")
 
 
-class HttpClientRequestExecutor(http.HttpBase):
-  # Default headers
-  DEFAULT_HEADERS = {
-    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
-    # TODO: For keep-alive, don't send "Connection: close"
-    http.HTTP_CONNECTION: "close",
-    }
+    # Disable SSL session ID caching (pycurl >= 7.16.0)
+    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
 
 
-  # Timeouts in seconds for socket layer
-  # TODO: Soft timeout instead of only socket timeout?
-  # TODO: Make read timeout configurable per OpCode?
-  CONNECT_TIMEOUT = 5
-  WRITE_TIMEOUT = 10
-  READ_TIMEOUT = None
-  CLOSE_TIMEOUT = 1
+    # Pass cURL object to external config function
+    if curl_config_fn:
+      curl_config_fn(curl)
 
 
-  def __init__(self, req):
-    """Initializes the HttpClientRequestExecutor class.
+    self._curl = curl
 
 
-    @type req: HttpClientRequest
-    @param req: Request object
+  @staticmethod
+  def _CreateCurlHandle():
+    """Returns a new cURL object.
 
     """
 
     """
-    http.HttpBase.__init__(self)
-    self.request = req
+    return pycurl.Curl()
 
 
-    try:
-      # TODO: Implement connection caching/keep-alive
-      self.sock = self._CreateSocket(req.ssl_params,
-                                     req.ssl_verify_peer)
+  def GetCurlHandle(self):
+    """Returns the cURL object.
 
 
-      # Disable Python's timeout
-      self.sock.settimeout(None)
+    """
+    return self._curl
 
 
-      # Operate in non-blocking mode
-      self.sock.setblocking(0)
+  def GetCurrentRequest(self):
+    """Returns the current request.
 
 
-      response_msg_reader = None
-      response_msg = None
-      force_close = True
+    @rtype: L{HttpClientRequest} or None
 
 
-      self._Connect()
-      try:
-        self._SendRequest()
-        (response_msg_reader, response_msg) = self._ReadResponse()
+    """
+    return self._req
+
+  def StartRequest(self, req):
+    """Starts a request on this client.
+
+    @type req: L{HttpClientRequest}
+    @param req: HTTP request
+
+    """
+    assert not self._req, "Another request is already started"
+
+    self._req = req
+    self._resp_buffer = StringIO()
 
 
-        # Only wait for server to close if we didn't have any exception.
-        force_close = False
-      finally:
-        # TODO: Keep-alive is not supported, always close connection
-        force_close = True
-        http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
-                                self.WRITE_TIMEOUT, response_msg_reader,
-                                force_close)
+    url = req.url
+    method = req.method
+    post_data = req.post_data
+    headers = req.headers
 
 
-      self.sock.close()
-      self.sock = None
+    # PycURL requires strings to be non-unicode
+    assert isinstance(method, str)
+    assert isinstance(url, str)
+    assert isinstance(post_data, str)
+    assert compat.all(isinstance(i, str) for i in headers)
 
 
-      req.response = response_msg
+    # Configure cURL object for request
+    curl = self._curl
+    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
+    curl.setopt(pycurl.URL, url)
+    curl.setopt(pycurl.POSTFIELDS, post_data)
+    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
+    curl.setopt(pycurl.HTTPHEADER, headers)
 
 
-      req.resp_version = req.response.start_line.version
-      req.resp_status_code = req.response.start_line.code
-      req.resp_reason = req.response.start_line.reason
-      req.resp_headers = req.response.headers
-      req.resp_body = req.response.body
+    if req.read_timeout is None:
+      curl.setopt(pycurl.TIMEOUT, 0)
+    else:
+      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
 
 
-      req.success = True
-      req.error = None
+    # Pass cURL object to external config function
+    if req.curl_config_fn:
+      req.curl_config_fn(curl)
 
 
-    except http.HttpError, err:
-      req.success = False
-      req.error = str(err)
+  def Done(self, errmsg):
+    """Finishes a request.
 
 
-  def _Connect(self):
-    """Non-blocking connect to host with timeout.
+    @type errmsg: string or None
+    @param errmsg: Error message if request failed
 
     """
 
     """
-    connected = False
-    while True:
-      try:
-        connect_error = self.sock.connect_ex((self.request.host,
-                                              self.request.port))
-      except socket.gaierror, err:
-        raise http.HttpError("Connection failed: %s" % str(err))
+    req = self._req
+    assert req, "No request"
 
 
-      if connect_error == errno.EINTR:
-        # Mask signals
-        pass
+    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
 
 
-      elif connect_error == 0:
-        # Connection established
-        connected = True
-        break
+    curl = self._curl
 
 
-      elif connect_error == errno.EINPROGRESS:
-        # Connection started
-        break
+    req.success = not bool(errmsg)
+    req.error = errmsg
 
 
-      raise http.HttpError("Connection failed (%s: %s)" %
-                             (connect_error, os.strerror(connect_error)))
+    # Get HTTP response code
+    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
+    req.resp_body = self._resp_buffer.getvalue()
 
 
-    if not connected:
-      # Wait for connection
-      event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
-                                          self.CONNECT_TIMEOUT)
-      if event is None:
-        raise http.HttpError("Timeout while connecting to server")
+    # Reset client object
+    self._req = None
+    self._resp_buffer = None
 
 
-      # Get error code
-      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-      if connect_error != 0:
-        raise http.HttpError("Connection failed (%s: %s)" %
-                               (connect_error, os.strerror(connect_error)))
+    # Ensure no potentially large variables are referenced
+    curl.setopt(pycurl.POSTFIELDS, "")
+    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
 
 
-    # Enable TCP keep-alive
-    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
 
 
-    # If needed, Linux specific options are available to change the TCP
-    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
-    # TCP_KEEPINTVL.
+class _PooledHttpClient:
+  """Data structure for HTTP client pool.
 
 
-    # Do the secret SSL handshake
-    if self.using_ssl:
-      self.sock.set_connect_state()
-      try:
-        http.Handshake(self.sock, self.WRITE_TIMEOUT)
-      except http.HttpSessionHandshakeUnexpectedEOF:
-        raise http.HttpError("Server closed connection during SSL handshake")
+  """
+  def __init__(self, identity, client):
+    """Initializes this class.
 
 
-  def _SendRequest(self):
-    """Sends request to server.
+    @type identity: string
+    @param identity: Client identifier for pool
+    @type client: L{_HttpClient}
+    @param client: HTTP client
 
     """
 
     """
-    # Headers
-    send_headers = self.DEFAULT_HEADERS.copy()
+    self.identity = identity
+    self.client = client
+    self.lastused = 0
 
 
-    if self.request.headers:
-      send_headers.update(self.request.headers)
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "id=%s" % self.identity,
+              "lastuse=%s" % self.lastused,
+              repr(self.client)]
 
 
-    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
-                                              self.request.port)
+    return "<%s at %#x>" % (" ".join(status), id(self))
 
 
-    # Response message
-    msg = http.HttpMessage()
 
 
-    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
-    # no keep-alive).
-    # TODO: For keep-alive, change to HTTP/1.1
-    msg.start_line = \
-      http.HttpClientToServerStartLine(method=self.request.method.upper(),
-                                       path=self.request.path,
-                                       version=http.HTTP_1_0)
-    msg.headers = send_headers
-    msg.body = self.request.post_data
+class HttpClientPool:
+  """A simple HTTP client pool.
 
 
-    try:
-      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
-    except http.HttpSocketTimeout:
-      raise http.HttpError("Timeout while sending request")
-    except socket.error, err:
-      raise http.HttpError("Error sending request: %s" % err)
+  Supports one pooled connection per identity (see
+  L{HttpClientRequest.identity}).
+
+  """
+  #: After how many generations to drop unused clients
+  _MAX_GENERATIONS_DROP = 25
+
+  def __init__(self, curl_config_fn):
+    """Initializes this class.
 
 
-  def _ReadResponse(self):
-    """Read response from server.
+    @type curl_config_fn: callable
+    @param curl_config_fn: Function to configure cURL object after
+                           initialization
 
     """
 
     """
-    response_msg = http.HttpMessage()
+    self._curl_config_fn = curl_config_fn
+    self._generation = 0
+    self._pool = {}
 
 
-    try:
-      response_msg_reader = \
-        _HttpServerToClientMessageReader(self.sock, response_msg,
-                                         self.READ_TIMEOUT)
-    except http.HttpSocketTimeout:
-      raise http.HttpError("Timeout while reading response")
-    except socket.error, err:
-      raise http.HttpError("Error reading response: %s" % err)
+  @staticmethod
+  def _GetHttpClientCreator():
+    """Returns callable to create HTTP client.
 
 
-    return (response_msg_reader, response_msg)
+    """
+    return _HttpClient
 
 
+  def _Get(self, identity):
+    """Gets an HTTP client from the pool.
 
 
-class _HttpClientPendingRequest(object):
-  """Data class for pending requests.
+    @type identity: string
+    @param identity: Client identifier
 
 
-  """
-  def __init__(self, request):
-    self.request = request
+    """
+    try:
+      pclient  = self._pool.pop(identity)
+    except KeyError:
+      # Need to create new client
+      client = self._GetHttpClientCreator()(self._curl_config_fn)
+      pclient = _PooledHttpClient(identity, client)
+      logging.debug("Created new client %s", pclient)
+    else:
+      logging.debug("Reusing client %s", pclient)
 
 
-    # Thread synchronization
-    self.done = threading.Event()
+    assert pclient.identity == identity
 
 
+    return pclient
 
 
-class HttpClientWorker(workerpool.BaseWorker):
-  """HTTP client worker class.
+  def _StartRequest(self, req):
+    """Starts a request.
 
 
-  """
-  def RunTask(self, pend_req):
-    try:
-      HttpClientRequestExecutor(pend_req.request)
-    finally:
-      pend_req.done.set()
+    @type req: L{HttpClientRequest}
+    @param req: HTTP request
 
 
+    """
+    logging.debug("Starting request %r", req)
+    pclient = self._Get(req.identity)
 
 
-class HttpClientWorkerPool(workerpool.WorkerPool):
-  def __init__(self, manager):
-    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
-                                   HttpClientWorker)
-    self.manager = manager
+    assert req.identity not in self._pool
 
 
+    pclient.client.StartRequest(req)
+    pclient.lastused = self._generation
 
 
-class HttpClientManager(object):
-  """Manages HTTP requests.
+    return pclient
 
 
-  """
-  def __init__(self):
-    self._wpool = HttpClientWorkerPool(self)
+  def _Return(self, pclients):
+    """Returns HTTP clients to the pool.
 
 
-  def __del__(self):
-    self.Shutdown()
+    """
+    for pc in pclients:
+      logging.debug("Returning client %s to pool", pc)
+      assert pc.identity not in self._pool
+      assert pc not in self._pool.values()
+      self._pool[pc.identity] = pc
+
+    # Check for unused clients
+    for pc in self._pool.values():
+      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
+        logging.debug("Removing client %s which hasn't been used"
+                      " for %s generations",
+                      pc, self._MAX_GENERATIONS_DROP)
+        self._pool.pop(pc.identity, None)
+
+    assert compat.all(pc.lastused >= (self._generation -
+                                      self._MAX_GENERATIONS_DROP)
+                      for pc in self._pool.values())
+
+  @staticmethod
+  def _CreateCurlMultiHandle():
+    """Creates new cURL multi handle.
 
 
-  def ExecRequests(self, requests):
-    """Execute HTTP requests.
+    """
+    return pycurl.CurlMulti()
 
 
-    This function can be called from multiple threads at the same time.
+  def ProcessRequests(self, requests):
+    """Processes any number of HTTP client requests using pooled objects.
 
 
-    @type requests: List of HttpClientRequest instances
-    @param requests: The requests to execute
-    @rtype: List of HttpClientRequest instances
-    @return: The list of requests passed in
+    @type requests: list of L{HttpClientRequest}
+    @param requests: List of all requests
 
     """
 
     """
-    # _HttpClientPendingRequest is used for internal thread synchronization
-    pending = [_HttpClientPendingRequest(req) for req in requests]
+    multi = self._CreateCurlMultiHandle()
 
 
-    try:
-      # Add requests to queue
-      for pend_req in pending:
-        self._wpool.AddTask(pend_req)
+    # For client cleanup
+    self._generation += 1
+
+    assert compat.all((req.error is None and
+                       req.success is None and
+                       req.resp_status_code is None and
+                       req.resp_body is None)
+                      for req in requests)
+
+    curl_to_pclient = {}
+    for req in requests:
+      pclient = self._StartRequest(req)
+      curl = pclient.client.GetCurlHandle()
+      curl_to_pclient[curl] = pclient
+      multi.add_handle(curl)
+      assert pclient.client.GetCurrentRequest() == req
+      assert pclient.lastused >= 0
+
+    assert len(curl_to_pclient) == len(requests)
+
+    done_count = 0
+    while True:
+      (ret, _) = multi.perform()
+      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
+
+      if ret == pycurl.E_CALL_MULTI_PERFORM:
+        # cURL wants to be called again
+        continue
+
+      while True:
+        (remaining_messages, successful, failed) = multi.info_read()
+
+        for curl in successful:
+          multi.remove_handle(curl)
+          done_count += 1
+          pclient = curl_to_pclient[curl]
+          req = pclient.client.GetCurrentRequest()
+          pclient.client.Done(None)
+          assert req.success
+          assert not pclient.client.GetCurrentRequest()
+
+        for curl, errnum, errmsg in failed:
+          multi.remove_handle(curl)
+          done_count += 1
+          pclient = curl_to_pclient[curl]
+          req = pclient.client.GetCurrentRequest()
+          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
+          assert req.error
+          assert not pclient.client.GetCurrentRequest()
+
+        if remaining_messages == 0:
+          break
+
+      assert done_count <= len(requests)
+
+      if done_count == len(requests):
+        break
 
 
-    finally:
-      # In case of an exception we should still wait for the rest, otherwise
-      # another thread from the worker pool could modify the request object
-      # after we returned.
+      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
+      # timeouts, which are only evaluated in multi.perform, aren't
+      # unnecessarily delayed.
+      multi.select(1.0)
 
 
-      # And wait for them to finish
-      for pend_req in pending:
-        pend_req.done.wait()
+    assert compat.all(pclient.client.GetCurrentRequest() is None
+                      for pclient in curl_to_pclient.values())
 
 
-    # Return original list
-    return requests
+    # Return clients to pool
+    self._Return(curl_to_pclient.values())
 
 
-  def Shutdown(self):
-    self._wpool.Quiesce()
-    self._wpool.TerminateWorkers()
+    assert done_count == len(requests)
+    assert compat.all(req.error is not None or
+                      (req.success and
+                       req.resp_status_code is not None and
+                       req.resp_body is not None)
+                      for req in requests)