Merge branch 'devel-2.7'
[ganeti-local] / lib / http / client.py
index 72836f1..849a195 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2007, 2008 Google Inc.
+# Copyright (C) 2007, 2008, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 """
 
-import BaseHTTPServer
-import cgi
 import logging
-import OpenSSL
-import os
-import select
-import socket
-import sys
-import time
-import signal
-import errno
+import pycurl
 import threading
+from cStringIO import StringIO
 
-from ganeti import constants
-from ganeti import serializer
-from ganeti import workerpool
-from ganeti import utils
 from ganeti import http
-
-
-HTTP_CLIENT_THREADS = 10
+from ganeti import compat
+from ganeti import netutils
+from ganeti import locking
 
 
 class HttpClientRequest(object):
   def __init__(self, host, port, method, path, headers=None, post_data=None,
-               ssl_params=None, ssl_verify_peer=False):
+               read_timeout=None, curl_config_fn=None, nicename=None,
+               completion_cb=None):
     """Describes an HTTP request.
 
     @type host: string
@@ -58,331 +47,347 @@ class HttpClientRequest(object):
     @param method: Method name
     @type path: string
     @param path: Request path
-    @type headers: dict or None
-    @param headers: Additional headers to send
+    @type headers: list or None
+    @param headers: Additional headers to send, list of strings
     @type post_data: string or None
     @param post_data: Additional data to send
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to compare our certificate with server's
-                            certificate
+    @type read_timeout: int
+    @param read_timeout: if passed, it will be used as the read
+        timeout while reading the response from the server
+    @type curl_config_fn: callable
+    @param curl_config_fn: Function to configure cURL object before request
+    @type nicename: string
+    @param nicename: Name, presentable to a user, to describe this request (no
+                     whitespace)
+    @type completion_cb: callable accepting this request object as a single
+                         parameter
+    @param completion_cb: Callback for request completion
 
     """
-    if post_data is not None:
-      assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
-        "Only POST and GET requests support sending data"
-
     assert path.startswith("/"), "Path must start with slash (/)"
+    assert curl_config_fn is None or callable(curl_config_fn)
+    assert completion_cb is None or callable(completion_cb)
 
     # Request attributes
     self.host = host
     self.port = port
-    self.ssl_params = ssl_params
-    self.ssl_verify_peer = ssl_verify_peer
     self.method = method
     self.path = path
-    self.headers = headers
-    self.post_data = post_data
-
+    self.read_timeout = read_timeout
+    self.curl_config_fn = curl_config_fn
+    self.nicename = nicename
+    self.completion_cb = completion_cb
+
+    if post_data is None:
+      self.post_data = ""
+    else:
+      self.post_data = post_data
+
+    if headers is None:
+      self.headers = []
+    elif isinstance(headers, dict):
+      # Support for old interface
+      self.headers = ["%s: %s" % (name, value)
+                      for name, value in headers.items()]
+    else:
+      self.headers = headers
+
+    # Response status
     self.success = None
     self.error = None
 
-    # Raw response
-    self.response = None
-
     # Response attributes
-    self.resp_version = None
     self.resp_status_code = None
-    self.resp_reason = None
-    self.resp_headers = None
     self.resp_body = None
 
+  def __repr__(self):
+    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+              "%s:%s" % (self.host, self.port),
+              self.method,
+              self.path]
 
-class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
-  pass
+    return "<%s at %#x>" % (" ".join(status), id(self))
 
+  @property
+  def url(self):
+    """Returns the full URL for this requests.
 
-class _HttpServerToClientMessageReader(http.HttpMessageReader):
-  # Length limits
-  START_LINE_LENGTH_MAX = 512
-  HEADER_LENGTH_MAX = 4096
+    """
+    if netutils.IPAddress.IsValid(self.host):
+      address = netutils.FormatAddress((self.host, self.port))
+    else:
+      address = "%s:%s" % (self.host, self.port)
+    # TODO: Support for non-SSL requests
+    return "https://%s%s" % (address, self.path)
 
-  def ParseStartLine(self, start_line):
-    """Parses the status line sent by the server.
 
-    """
-    # Empty lines are skipped when reading
-    assert start_line
+def _StartRequest(curl, req):
+  """Starts a request on a cURL object.
 
-    try:
-      [version, status, reason] = start_line.split(None, 2)
-    except ValueError:
-      try:
-        [version, status] = start_line.split(None, 1)
-        reason = ""
-      except ValueError:
-        version = http.HTTP_0_9
-
-    if version:
-      version = version.upper()
-
-    # The status code is a three-digit number
-    try:
-      status = int(status)
-      if status < 100 or status > 999:
-        status = -1
-    except ValueError:
-      status = -1
+  @type curl: pycurl.Curl
+  @param curl: cURL object
+  @type req: L{HttpClientRequest}
+  @param req: HTTP request
 
-    if status == -1:
-      raise http.HttpError("Invalid status code (%r)" % start_line)
+  """
+  logging.debug("Starting request %r", req)
 
-    return http.HttpServerToClientStartLine(version, status, reason)
+  url = req.url
+  method = req.method
+  post_data = req.post_data
+  headers = req.headers
 
+  # PycURL requires strings to be non-unicode
+  assert isinstance(method, str)
+  assert isinstance(url, str)
+  assert isinstance(post_data, str)
+  assert compat.all(isinstance(i, str) for i in headers)
 
-class HttpClientRequestExecutor(http.HttpBase):
-  # Default headers
-  DEFAULT_HEADERS = {
-    http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
-    # TODO: For keep-alive, don't send "Connection: close"
-    http.HTTP_CONNECTION: "close",
-    }
+  # Buffer for response
+  resp_buffer = StringIO()
 
-  # Timeouts in seconds for socket layer
-  # TODO: Soft timeout instead of only socket timeout?
-  # TODO: Make read timeout configurable per OpCode?
-  CONNECT_TIMEOUT = 5
-  WRITE_TIMEOUT = 10
-  READ_TIMEOUT = None
-  CLOSE_TIMEOUT = 1
+  # Configure client for request
+  curl.setopt(pycurl.VERBOSE, False)
+  curl.setopt(pycurl.NOSIGNAL, True)
+  curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
+  curl.setopt(pycurl.PROXY, "")
+  curl.setopt(pycurl.CUSTOMREQUEST, str(method))
+  curl.setopt(pycurl.URL, url)
+  curl.setopt(pycurl.POSTFIELDS, post_data)
+  curl.setopt(pycurl.HTTPHEADER, headers)
 
-  def __init__(self, req):
-    """Initializes the HttpClientRequestExecutor class.
+  if req.read_timeout is None:
+    curl.setopt(pycurl.TIMEOUT, 0)
+  else:
+    curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
 
-    @type req: HttpClientRequest
-    @param req: Request object
+  # Disable SSL session ID caching (pycurl >= 7.16.0)
+  if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
+    curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
 
-    """
-    http.HttpBase.__init__(self)
-    self.request = req
+  curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
 
-    self.poller = select.poll()
+  # Pass cURL object to external config function
+  if req.curl_config_fn:
+    req.curl_config_fn(curl)
 
-    try:
-      # TODO: Implement connection caching/keep-alive
-      self.sock = self._CreateSocket(req.ssl_params,
-                                     req.ssl_verify_peer)
+  return _PendingRequest(curl, req, resp_buffer.getvalue)
 
-      # Disable Python's timeout
-      self.sock.settimeout(None)
 
-      # Operate in non-blocking mode
-      self.sock.setblocking(0)
+class _PendingRequest:
+  def __init__(self, curl, req, resp_buffer_read):
+    """Initializes this class.
 
-      response_msg_reader = None
-      response_msg = None
-      force_close = True
+    @type curl: pycurl.Curl
+    @param curl: cURL object
+    @type req: L{HttpClientRequest}
+    @param req: HTTP request
+    @type resp_buffer_read: callable
+    @param resp_buffer_read: Function to read response body
 
-      self._Connect()
-      try:
-        self._SendRequest()
-        (response_msg_reader, response_msg) = self._ReadResponse()
+    """
+    assert req.success is None
 
-        # Only wait for server to close if we didn't have any exception.
-        force_close = False
-      finally:
-        # TODO: Keep-alive is not supported, always close connection
-        force_close = True
-        http.ShutdownConnection(self.poller, self.sock,
-                                self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
-                                response_msg_reader, force_close)
+    self._curl = curl
+    self._req = req
+    self._resp_buffer_read = resp_buffer_read
 
-      self.sock.close()
-      self.sock = None
+  def GetCurlHandle(self):
+    """Returns the cURL object.
 
-      req.response = response_msg
+    """
+    return self._curl
 
-      req.resp_version = req.response.start_line.version
-      req.resp_status_code = req.response.start_line.code
-      req.resp_reason = req.response.start_line.reason
-      req.resp_headers = req.response.headers
-      req.resp_body = req.response.body
+  def GetCurrentRequest(self):
+    """Returns the current request.
 
-      req.success = True
-      req.error = None
+    """
+    return self._req
 
-    except http.HttpError, err:
-      req.success = False
-      req.error = str(err)
+  def Done(self, errmsg):
+    """Finishes a request.
 
-  def _Connect(self):
-    """Non-blocking connect to host with timeout.
+    @type errmsg: string or None
+    @param errmsg: Error message if request failed
 
     """
-    connected = False
-    while True:
-      try:
-        connect_error = self.sock.connect_ex((self.request.host,
-                                              self.request.port))
-      except socket.gaierror, err:
-        raise http.HttpError("Connection failed: %s" % str(err))
-
-      if connect_error == errno.EINTR:
-        # Mask signals
-        pass
-
-      elif connect_error == 0:
-        # Connection established
-        connected = True
-        break
+    curl = self._curl
+    req = self._req
 
-      elif connect_error == errno.EINPROGRESS:
-        # Connection started
-        break
+    assert req.success is None, "Request has already been finalized"
 
-      raise http.HttpError("Connection failed (%s: %s)" %
-                             (connect_error, os.strerror(connect_error)))
+    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
 
-    if not connected:
-      # Wait for connection
-      event = http.WaitForSocketCondition(self.poller, self.sock,
-                                          select.POLLOUT, self.CONNECT_TIMEOUT)
-      if event is None:
-        raise http.HttpError("Timeout while connecting to server")
+    req.success = not bool(errmsg)
+    req.error = errmsg
 
-      # Get error code
-      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-      if connect_error != 0:
-        raise http.HttpError("Connection failed (%s: %s)" %
-                               (connect_error, os.strerror(connect_error)))
+    # Get HTTP response code
+    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
+    req.resp_body = self._resp_buffer_read()
 
-    # Enable TCP keep-alive
-    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+    # Ensure no potentially large variables are referenced
+    curl.setopt(pycurl.POSTFIELDS, "")
+    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
 
-    # If needed, Linux specific options are available to change the TCP
-    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
-    # TCP_KEEPINTVL.
+    if req.completion_cb:
+      req.completion_cb(req)
 
-  def _SendRequest(self):
-    """Sends request to server.
 
-    """
-    # Headers
-    send_headers = self.DEFAULT_HEADERS.copy()
+class _NoOpRequestMonitor: # pylint: disable=W0232
+  """No-op request monitor.
 
-    if self.request.headers:
-      send_headers.update(self.request.headers)
+  """
+  @staticmethod
+  def acquire(*args, **kwargs):
+    pass
 
-    send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
+  release = acquire
+  Disable = acquire
 
-    # Response message
-    msg = http.HttpMessage()
 
-    # Combine request line. We only support HTTP/1.0 (no chunked transfers and
-    # no keep-alive).
-    # TODO: For keep-alive, change to HTTP/1.1
-    msg.start_line = \
-      http.HttpClientToServerStartLine(method=self.request.method.upper(),
-                                       path=self.request.path, version=http.HTTP_1_0)
-    msg.headers = send_headers
-    msg.body = self.request.post_data
+class _PendingRequestMonitor:
+  _LOCK = "_lock"
 
-    try:
-      _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
-    except http.HttpSocketTimeout:
-      raise http.HttpError("Timeout while sending request")
-    except socket.error, err:
-      raise http.HttpError("Error sending request: %s" % err)
+  def __init__(self, owner, pending_fn):
+    """Initializes this class.
+
+    """
+    self._owner = owner
+    self._pending_fn = pending_fn
+
+    # The lock monitor runs in another thread, hence locking is necessary
+    self._lock = locking.SharedLock("PendingHttpRequests")
+    self.acquire = self._lock.acquire
+    self.release = self._lock.release
 
-  def _ReadResponse(self):
-    """Read response from server.
+  @locking.ssynchronized(_LOCK)
+  def Disable(self):
+    """Disable monitor.
 
     """
-    response_msg = http.HttpMessage()
+    self._pending_fn = None
 
-    try:
-      response_msg_reader = \
-        _HttpServerToClientMessageReader(self.sock, response_msg,
-                                         self.READ_TIMEOUT)
-    except http.HttpSocketTimeout:
-      raise http.HttpError("Timeout while reading response")
-    except socket.error, err:
-      raise http.HttpError("Error reading response: %s" % err)
+  @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about pending requests.
 
-    return (response_msg_reader, response_msg)
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
 
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for requests, hence all show up as
+    # one item under "pending".
+    result = []
 
-class _HttpClientPendingRequest(object):
-  """Data class for pending requests.
+    if self._pending_fn:
+      owner_name = self._owner.getName()
+
+      for client in self._pending_fn():
+        req = client.GetCurrentRequest()
+        if req:
+          if req.nicename is None:
+            name = "%s%s" % (req.host, req.path)
+          else:
+            name = req.nicename
+          result.append(("rpc/%s" % name, None, [owner_name], None))
+
+    return result
 
-  """
-  def __init__(self, request):
-    self.request = request
 
-    # Thread synchronization
-    self.done = threading.Event()
+def _ProcessCurlRequests(multi, requests):
+  """cURL request processor.
 
+  This generator yields a tuple once for every completed request, successful or
+  not. The first value in the tuple is the handle, the second an error message
+  or C{None} for successful requests.
 
-class HttpClientWorker(workerpool.BaseWorker):
-  """HTTP client worker class.
+  @type multi: C{pycurl.CurlMulti}
+  @param multi: cURL multi object
+  @type requests: sequence
+  @param requests: cURL request handles
 
   """
-  def RunTask(self, pend_req):
-    try:
-      HttpClientRequestExecutor(pend_req.request)
-    finally:
-      pend_req.done.set()
+  for curl in requests:
+    multi.add_handle(curl)
 
+  while True:
+    (ret, active) = multi.perform()
+    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
 
-class HttpClientWorkerPool(workerpool.WorkerPool):
-  def __init__(self, manager):
-    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
-                                   HttpClientWorker)
-    self.manager = manager
+    if ret == pycurl.E_CALL_MULTI_PERFORM:
+      # cURL wants to be called again
+      continue
 
+    while True:
+      (remaining_messages, successful, failed) = multi.info_read()
 
-class HttpClientManager(object):
-  """Manages HTTP requests.
+      for curl in successful:
+        multi.remove_handle(curl)
+        yield (curl, None)
 
-  """
-  def __init__(self):
-    self._wpool = HttpClientWorkerPool(self)
+      for curl, errnum, errmsg in failed:
+        multi.remove_handle(curl)
+        yield (curl, "Error %s: %s" % (errnum, errmsg))
 
-  def __del__(self):
-    self.Shutdown()
+      if remaining_messages == 0:
+        break
 
-  def ExecRequests(self, requests):
-    """Execute HTTP requests.
+    if active == 0:
+      # No active handles anymore
+      break
 
-    This function can be called from multiple threads at the same time.
+    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
+    # timeouts, which are only evaluated in multi.perform, aren't
+    # unnecessarily delayed.
+    multi.select(1.0)
 
-    @type requests: List of HttpClientRequest instances
-    @param requests: The requests to execute
-    @rtype: List of HttpClientRequest instances
-    @returns: The list of requests passed in
 
-    """
-    # _HttpClientPendingRequest is used for internal thread synchronization
-    pending = [_HttpClientPendingRequest(req) for req in requests]
+def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
+                    _curl_multi=pycurl.CurlMulti,
+                    _curl_process=_ProcessCurlRequests):
+  """Processes any number of HTTP client requests.
 
-    try:
-      # Add requests to queue
-      for pend_req in pending:
-        self._wpool.AddTask(pend_req)
+  @type requests: list of L{HttpClientRequest}
+  @param requests: List of all requests
+  @param lock_monitor_cb: Callable for registering with lock monitor
 
+  """
+  assert compat.all((req.error is None and
+                     req.success is None and
+                     req.resp_status_code is None and
+                     req.resp_body is None)
+                    for req in requests)
+
+  # Prepare all requests
+  curl_to_client = \
+    dict((client.GetCurlHandle(), client)
+         for client in map(lambda req: _StartRequest(_curl(), req), requests))
+
+  assert len(curl_to_client) == len(requests)
+
+  if lock_monitor_cb:
+    monitor = _PendingRequestMonitor(threading.currentThread(),
+                                     curl_to_client.values)
+    lock_monitor_cb(monitor)
+  else:
+    monitor = _NoOpRequestMonitor
+
+  # Process all requests and act based on the returned values
+  for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
+    monitor.acquire(shared=0)
+    try:
+      curl_to_client.pop(curl).Done(msg)
     finally:
-      # In case of an exception we should still wait for the rest, otherwise
-      # another thread from the worker pool could modify the request object
-      # after we returned.
+      monitor.release()
 
-      # And wait for them to finish
-      for pend_req in pending:
-        pend_req.done.wait()
+  assert not curl_to_client, "Not all requests were processed"
 
-    # Return original list
-    return requests
+  # Don't try to read information anymore as all requests have been processed
+  monitor.Disable()
 
-  def Shutdown(self):
-    self._wpool.Quiesce()
-    self._wpool.TerminateWorkers()
+  assert compat.all(req.error is not None or
+                    (req.success and
+                     req.resp_status_code is not None and
+                     req.resp_body is not None)
+                    for req in requests)