Some docstring updates
[ganeti-local] / lib / http / __init__.py
index c3bc04c..5b09680 100644 (file)
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-"""HTTP server module.
+"""HTTP module.
 
 """
 
-import BaseHTTPServer
-import cgi
 import logging
 import mimetools
 import OpenSSL
-import os
 import select
 import socket
-import sys
-import time
-import signal
-import logging
 import errno
-import threading
 
 from cStringIO import StringIO
 
 from ganeti import constants
 from ganeti import serializer
-from ganeti import workerpool
 from ganeti import utils
 
 
-HTTP_CLIENT_THREADS = 10
-
 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
 
-WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
-MONTHNAME = [None,
-             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
-             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
-
-# Default error message
-DEFAULT_ERROR_CONTENT_TYPE = "text/html"
-DEFAULT_ERROR_MESSAGE = """\
-<head>
-<title>Error response</title>
-</head>
-<body>
-<h1>Error response</h1>
-<p>Error code %(code)d.
-<p>Message: %(message)s.
-<p>Error code explanation: %(code)s = %(explain)s.
-</body>
-"""
-
 HTTP_OK = 200
 HTTP_NO_CONTENT = 204
 HTTP_NOT_MODIFIED = 304
@@ -80,6 +50,7 @@ HTTP_GET = "GET"
 HTTP_HEAD = "HEAD"
 HTTP_POST = "POST"
 HTTP_PUT = "PUT"
+HTTP_DELETE = "DELETE"
 
 HTTP_ETAG = "ETag"
 HTTP_HOST = "Host"
@@ -90,21 +61,42 @@ HTTP_CONTENT_TYPE = "Content-Type"
 HTTP_CONTENT_LENGTH = "Content-Length"
 HTTP_CONNECTION = "Connection"
 HTTP_KEEP_ALIVE = "Keep-Alive"
+HTTP_WWW_AUTHENTICATE = "WWW-Authenticate"
+HTTP_AUTHORIZATION = "Authorization"
+HTTP_AUTHENTICATION_INFO = "Authentication-Info"
+HTTP_ALLOW = "Allow"
 
 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
 
 # Socket operations
 (SOCKOP_SEND,
  SOCKOP_RECV,
- SOCKOP_SHUTDOWN) = range(3)
+ SOCKOP_SHUTDOWN,
+ SOCKOP_HANDSHAKE) = range(4)
+
+# send/receive quantum
+SOCK_BUF_SIZE = 32768
+
+
+class HttpError(Exception):
+  """Internal exception for HTTP errors.
+
+  This should only be used for internal error reporting.
+
+  """
 
 
-class SocketClosed(socket.error):
-  pass
+class HttpConnectionClosed(Exception):
+  """Internal exception for a closed connection.
+
+  This should only be used for internal error reporting. Only use
+  it if there's no other way to report this condition.
+
+  """
 
 
-class _HttpClientError(Exception):
-  """Internal exception for HTTP client errors.
+class HttpSessionHandshakeUnexpectedEOF(HttpError):
+  """Internal exception for errors during SSL handshake.
 
   This should only be used for internal error reporting.
 
@@ -119,53 +111,171 @@ class HttpSocketTimeout(Exception):
   """
 
 
-class HTTPException(Exception):
+class HttpException(Exception):
   code = None
   message = None
 
-  def __init__(self, message=None):
+  def __init__(self, message=None, headers=None):
     Exception.__init__(self)
-    if message is not None:
-      self.message = message
+    self.message = message
+    self.headers = headers
+
 
+class HttpBadRequest(HttpException):
+  """400 Bad Request
 
-class HTTPBadRequest(HTTPException):
+  RFC2616, 10.4.1: The request could not be understood by the server
+  due to malformed syntax. The client SHOULD NOT repeat the request
+  without modifications.
+
+  """
   code = 400
 
 
-class HTTPForbidden(HTTPException):
+class HttpUnauthorized(HttpException):
+  """401 Unauthorized
+
+  RFC2616, section 10.4.2: The request requires user
+  authentication. The response MUST include a WWW-Authenticate header
+  field (section 14.47) containing a challenge applicable to the
+  requested resource.
+
+  """
+  code = 401
+
+
+class HttpForbidden(HttpException):
+  """403 Forbidden
+
+  RFC2616, 10.4.4: The server understood the request, but is refusing
+  to fulfill it.  Authorization will not help and the request SHOULD
+  NOT be repeated.
+
+  """
   code = 403
 
 
-class HTTPNotFound(HTTPException):
+class HttpNotFound(HttpException):
+  """404 Not Found
+
+  RFC2616, 10.4.5: The server has not found anything matching the
+  Request-URI.  No indication is given of whether the condition is
+  temporary or permanent.
+
+  """
   code = 404
 
 
-class HTTPGone(HTTPException):
+class HttpMethodNotAllowed(HttpException):
+  """405 Method Not Allowed
+
+  RFC2616, 10.4.6: The method specified in the Request-Line is not
+  allowed for the resource identified by the Request-URI. The response
+  MUST include an Allow header containing a list of valid methods for
+  the requested resource.
+
+  """
+  code = 405
+
+
+class HttpRequestTimeout(HttpException):
+  """408 Request Timeout
+
+  RFC2616, 10.4.9: The client did not produce a request within the
+  time that the server was prepared to wait. The client MAY repeat the
+  request without modifications at any later time.
+
+  """
+  code = 408
+
+
+class HttpConflict(HttpException):
+  """409 Conflict
+
+  RFC2616, 10.4.10: The request could not be completed due to a
+  conflict with the current state of the resource. This code is only
+  allowed in situations where it is expected that the user might be
+  able to resolve the conflict and resubmit the request.
+
+  """
+  code = 409
+
+
+class HttpGone(HttpException):
+  """410 Gone
+
+  RFC2616, 10.4.11: The requested resource is no longer available at
+  the server and no forwarding address is known. This condition is
+  expected to be considered permanent.
+
+  """
   code = 410
 
 
-class HTTPLengthRequired(HTTPException):
+class HttpLengthRequired(HttpException):
+  """411 Length Required
+
+  RFC2616, 10.4.12: The server refuses to accept the request without a
+  defined Content-Length. The client MAY repeat the request if it adds
+  a valid Content-Length header field containing the length of the
+  message-body in the request message.
+
+  """
   code = 411
 
 
-class HTTPInternalError(HTTPException):
+class HttpPreconditionFailed(HttpException):
+  """412 Precondition Failed
+
+  RFC2616, 10.4.13: The precondition given in one or more of the
+  request-header fields evaluated to false when it was tested on the
+  server.
+
+  """
+  code = 412
+
+
+class HttpInternalServerError(HttpException):
+  """500 Internal Server Error
+
+  RFC2616, 10.5.1: The server encountered an unexpected condition
+  which prevented it from fulfilling the request.
+
+  """
   code = 500
 
 
-class HTTPNotImplemented(HTTPException):
+class HttpNotImplemented(HttpException):
+  """501 Not Implemented
+
+  RFC2616, 10.5.2: The server does not support the functionality
+  required to fulfill the request.
+
+  """
   code = 501
 
 
-class HTTPServiceUnavailable(HTTPException):
+class HttpServiceUnavailable(HttpException):
+  """503 Service Unavailable
+
+  RFC2616, 10.5.4: The server is currently unable to handle the
+  request due to a temporary overloading or maintenance of the server.
+
+  """
   code = 503
 
 
-class HTTPVersionNotSupported(HTTPException):
+class HttpVersionNotSupported(HttpException):
+  """505 HTTP Version Not Supported
+
+  RFC2616, 10.5.6: The server does not support, or refuses to support,
+  the HTTP protocol version that was used in the request message.
+
+  """
   code = 505
 
 
-class HTTPJsonConverter:
+class HttpJsonConverter:
   CONTENT_TYPE = "application/json"
 
   def Encode(self, data):
@@ -175,13 +285,11 @@ class HTTPJsonConverter:
     return serializer.LoadJson(data)
 
 
-def WaitForSocketCondition(poller, sock, event, timeout):
+def WaitForSocketCondition(sock, event, timeout):
   """Waits for a condition to occur on the socket.
 
-  @type poller: select.Poller
-  @param poller: Poller object as created by select.poll()
   @type sock: socket
-  @param socket: Wait for events on this socket
+  @param sock: Wait for events on this socket
   @type event: int
   @param event: ORed condition (see select module)
   @type timeout: float or None
@@ -197,6 +305,7 @@ def WaitForSocketCondition(poller, sock, event, timeout):
     # Poller object expects milliseconds
     timeout *= 1000
 
+  poller = select.poll()
   poller.register(sock, event)
   try:
     while True:
@@ -214,26 +323,25 @@ def WaitForSocketCondition(poller, sock, event, timeout):
     poller.unregister(sock)
 
 
-def SocketOperation(poller, sock, op, arg1, timeout):
+def SocketOperation(sock, op, arg1, timeout):
   """Wrapper around socket functions.
 
   This function abstracts error handling for socket operations, especially
   for the complicated interaction with OpenSSL.
 
-  @type poller: select.Poller
-  @param poller: Poller object as created by select.poll()
   @type sock: socket
-  @param socket: Socket for the operation
+  @param sock: Socket for the operation
   @type op: int
   @param op: Operation to execute (SOCKOP_* constants)
   @type arg1: any
   @param arg1: Parameter for function (if needed)
   @type timeout: None or float
   @param timeout: Timeout in seconds or None
+  @return: Return value of socket function
 
   """
   # TODO: event_poll/event_check/override
-  if op == SOCKOP_SEND:
+  if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
     event_poll = select.POLLOUT
     event_check = select.POLLOUT
 
@@ -252,18 +360,23 @@ def SocketOperation(poller, sock, op, arg1, timeout):
   else:
     raise AssertionError("Invalid socket operation")
 
+  # Handshake is only supported by SSL sockets
+  if (op == SOCKOP_HANDSHAKE and
+      not isinstance(sock, OpenSSL.SSL.ConnectionType)):
+    return
+
   # No override by default
   event_override = 0
 
   while True:
     # Poll only for certain operations and when asked for by an override
-    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
+    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV, SOCKOP_HANDSHAKE):
       if event_override:
         wait_for_event = event_override
       else:
         wait_for_event = event_poll
 
-      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
+      event = WaitForSocketCondition(sock, wait_for_event, timeout)
       if event is None:
         raise HttpSocketTimeout()
 
@@ -292,6 +405,9 @@ def SocketOperation(poller, sock, op, arg1, timeout):
           else:
             return sock.shutdown(arg1)
 
+        elif op == SOCKOP_HANDSHAKE:
+          return sock.do_handshake()
+
       except OpenSSL.SSL.WantWriteError:
         # OpenSSL wants to write, poll for POLLOUT
         event_override = select.POLLOUT
@@ -305,6 +421,21 @@ def SocketOperation(poller, sock, op, arg1, timeout):
       except OpenSSL.SSL.WantX509LookupError:
         continue
 
+      except OpenSSL.SSL.ZeroReturnError, err:
+        # SSL Connection has been closed. In SSL 3.0 and TLS 1.0, this only
+        # occurs if a closure alert has occurred in the protocol, i.e. the
+        # connection has been closed cleanly. Note that this does not
+        # necessarily mean that the transport layer (e.g. a socket) has been
+        # closed.
+        if op == SOCKOP_SEND:
+          # Can happen during a renegotiation
+          raise HttpConnectionClosed(err.args)
+        elif op == SOCKOP_RECV:
+          return ""
+
+        # SSL_shutdown shouldn't return SSL_ERROR_ZERO_RETURN
+        raise socket.error(err.args)
+
       except OpenSSL.SSL.SysCallError, err:
         if op == SOCKOP_SEND:
           # arg1 is the data when writing
@@ -313,9 +444,13 @@ def SocketOperation(poller, sock, op, arg1, timeout):
             # and can be ignored
             return 0
 
-        elif op == SOCKOP_RECV:
-          if err.args == (-1, _SSL_UNEXPECTED_EOF):
+        if err.args == (-1, _SSL_UNEXPECTED_EOF):
+          if op == SOCKOP_RECV:
             return ""
+          elif op == SOCKOP_HANDSHAKE:
+            # Can happen if peer disconnects directly after the connection is
+            # opened.
+            raise HttpSessionHandshakeUnexpectedEOF(err.args)
 
         raise socket.error(err.args)
 
@@ -330,6 +465,65 @@ def SocketOperation(poller, sock, op, arg1, timeout):
       raise
 
 
+def ShutdownConnection(sock, close_timeout, write_timeout, msgreader, force):
+  """Closes the connection.
+
+  @type sock: socket
+  @param sock: Socket to be shut down
+  @type close_timeout: float
+  @param close_timeout: How long to wait for the peer to close
+      the connection
+  @type write_timeout: float
+  @param write_timeout: Write timeout for shutdown
+  @type msgreader: http.HttpMessageReader
+  @param msgreader: Request message reader, used to determine whether
+      peer should close connection
+  @type force: bool
+  @param force: Whether to forcibly close the connection without
+      waiting for peer
+
+  """
+  #print msgreader.peer_will_close, force
+  if msgreader and msgreader.peer_will_close and not force:
+    # Wait for peer to close
+    try:
+      # Check whether it's actually closed
+      if not SocketOperation(sock, SOCKOP_RECV, 1, close_timeout):
+        return
+    except (socket.error, HttpError, HttpSocketTimeout):
+      # Ignore errors at this stage
+      pass
+
+  # Close the connection from our side
+  try:
+    # We don't care about the return value, see NOTES in SSL_shutdown(3).
+    SocketOperation(sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
+                    write_timeout)
+  except HttpSocketTimeout:
+    raise HttpError("Timeout while shutting down connection")
+  except socket.error, err:
+    # Ignore ENOTCONN
+    if not (err.args and err.args[0] == errno.ENOTCONN):
+      raise HttpError("Error while shutting down connection: %s" % err)
+
+
+def Handshake(sock, write_timeout):
+  """Shakes peer's hands.
+
+  @type sock: socket
+  @param sock: Socket to be shut down
+  @type write_timeout: float
+  @param write_timeout: Write timeout for handshake
+
+  """
+  try:
+    return SocketOperation(sock, SOCKOP_HANDSHAKE, None, write_timeout)
+  except HttpSocketTimeout:
+    raise HttpError("Timeout during SSL handshake")
+  except socket.error, err:
+    raise HttpError("Error in SSL handshake: %s" % err)
+
+
 class HttpSslParams(object):
   """Data class for SSL key and certificate.
 
@@ -340,7 +534,8 @@ class HttpSslParams(object):
     @type ssl_key_path: string
     @param ssl_key_path: Path to file containing SSL key in PEM format
     @type ssl_cert_path: string
-    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
+    @param ssl_cert_path: Path to file containing SSL certificate
+        in PEM format
 
     """
     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
@@ -355,12 +550,12 @@ class HttpSslParams(object):
                                            self.ssl_cert_pem)
 
 
-class HttpSocketBase(object):
+class HttpBase(object):
   """Base class for HTTP server and client.
 
   """
   def __init__(self):
-    self._using_ssl = None
+    self.using_ssl = None
     self._ssl_params = None
     self._ssl_key = None
     self._ssl_cert = None
@@ -371,8 +566,8 @@ class HttpSocketBase(object):
     @type ssl_params: HttpSslParams
     @param ssl_params: SSL key and certificate
     @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to require client certificate and compare
-                            it with our certificate
+    @param ssl_verify_peer: Whether to require client certificate
+        and compare it with our certificate
 
     """
     self._ssl_params = ssl_params
@@ -380,9 +575,9 @@ class HttpSocketBase(object):
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
     # Should we enable SSL?
-    self._using_ssl = ssl_params is not None
+    self.using_ssl = ssl_params is not None
 
-    if not self._using_ssl:
+    if not self.using_ssl:
       return sock
 
     self._ssl_key = ssl_params.GetKey()
@@ -415,697 +610,198 @@ class HttpSocketBase(object):
             self._ssl_cert.digest("md5") == cert.digest("md5"))
 
 
-class HttpServerRequestExecutor(object):
-  """Implements server side of HTTP
-
-  This class implements the server side of HTTP. It's based on code of Python's
-  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
-  character encodings. Keep-alive connections are not supported.
+class HttpMessage(object):
+  """Data structure for HTTP message.
 
   """
-  # The default request version.  This only affects responses up until
-  # the point where the request line is parsed, so it mainly decides what
-  # the client gets back when sending a malformed request line.
-  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
-  default_request_version = HTTP_0_9
-
-  # Error message settings
-  error_message_format = DEFAULT_ERROR_MESSAGE
-  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
-
-  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
+  def __init__(self):
+    self.start_line = None
+    self.headers = None
+    self.body = None
+    self.decoded_body = None
 
-  def __init__(self, server, conn, client_addr, fileio_class):
-    """Initializes this class.
 
-    Part of the initialization is reading the request and eventual POST/PUT
-    data sent by the client.
+class HttpClientToServerStartLine(object):
+  """Data structure for HTTP request start line.
 
-    """
-    self._server = server
-
-    # We default rfile to buffered because otherwise it could be
-    # really slow for large data (a getc() call per byte); we make
-    # wfile unbuffered because (a) often after a write() we want to
-    # read and we need to flush the line; (b) big writes to unbuffered
-    # files are typically optimized by stdio even when big reads
-    # aren't.
-    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
-    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
-
-    self.client_addr = client_addr
-
-    self.request_headers = None
-    self.request_method = None
-    self.request_path = None
-    self.request_requestline = None
-    self.request_version = self.default_request_version
-
-    self.response_body = None
-    self.response_code = HTTP_OK
-    self.response_content_type = None
-    self.response_headers = {}
-
-    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
-    try:
-      try:
-        try:
-          try:
-            # Read, parse and handle request
-            self._ReadRequest()
-            self._ReadPostData()
-            self._HandleRequest()
-          except HTTPException, err:
-            self._SetErrorStatus(err)
-        finally:
-          # Try to send a response
-          self._SendResponse()
-          self._Close()
-      except SocketClosed:
-        pass
-    finally:
-      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
-
-  def _Close(self):
-    if not self.wfile.closed:
-      self.wfile.flush()
-    self.wfile.close()
-    self.rfile.close()
-
-  def _DateTimeHeader(self):
-    """Return the current date and time formatted for a message header.
+  """
+  def __init__(self, method, path, version):
+    self.method = method
+    self.path = path
+    self.version = version
 
-    """
-    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
-    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
-            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
+  def __str__(self):
+    return "%s %s %s" % (self.method, self.path, self.version)
 
-  def _SetErrorStatus(self, err):
-    """Sets the response code and body from a HTTPException.
 
-    @type err: HTTPException
-    @param err: Exception instance
+class HttpServerToClientStartLine(object):
+  """Data structure for HTTP response start line.
 
-    """
-    try:
-      (shortmsg, longmsg) = self.responses[err.code]
-    except KeyError:
-      shortmsg = longmsg = "Unknown"
+  """
+  def __init__(self, version, code, reason):
+    self.version = version
+    self.code = code
+    self.reason = reason
 
-    if err.message:
-      message = err.message
-    else:
-      message = shortmsg
+  def __str__(self):
+    return "%s %s %s" % (self.version, self.code, self.reason)
 
-    values = {
-      "code": err.code,
-      "message": cgi.escape(message),
-      "explain": longmsg,
-      }
 
-    self.response_code = err.code
-    self.response_content_type = self.error_content_type
-    self.response_body = self.error_message_format % values
+class HttpMessageWriter(object):
+  """Writes an HTTP message to a socket.
 
-  def _HandleRequest(self):
-    """Handle the actual request.
+  """
+  def __init__(self, sock, msg, write_timeout):
+    """Initializes this class and writes an HTTP message to a socket.
 
-    Calls the actual handler function and converts exceptions into HTTP errors.
+    @type sock: socket
+    @param sock: Socket to be written to
+    @type msg: http.HttpMessage
+    @param msg: HTTP message to be written
+    @type write_timeout: float
+    @param write_timeout: Write timeout for socket
 
     """
-    # Don't do anything if there's already been a problem
-    if self.response_code != HTTP_OK:
-      return
+    self._msg = msg
 
-    assert self.request_method, "Status code %s requires a method" % HTTP_OK
+    self._PrepareMessage()
 
-    # Check whether client is still there
-    self.rfile.read(0)
+    buf = self._FormatMessage()
 
-    try:
-      try:
-        result = self._server.HandleRequest(self)
+    pos = 0
+    end = len(buf)
+    while pos < end:
+      # Send only SOCK_BUF_SIZE bytes at a time
+      data = buf[pos:(pos + SOCK_BUF_SIZE)]
 
-        # TODO: Content-type
-        encoder = HTTPJsonConverter()
-        body = encoder.Encode(result)
+      sent = SocketOperation(sock, SOCKOP_SEND, data, write_timeout)
 
-        self.response_content_type = encoder.CONTENT_TYPE
-        self.response_body = body
-      except (HTTPException, KeyboardInterrupt, SystemExit):
-        raise
-      except Exception, err:
-        logging.exception("Caught exception")
-        raise HTTPInternalError(message=str(err))
-      except:
-        logging.exception("Unknown exception")
-        raise HTTPInternalError(message="Unknown error")
+      # Remove sent bytes
+      pos += sent
 
-    except HTTPException, err:
-      self._SetErrorStatus(err)
+    assert pos == end, "Message wasn't sent completely"
 
-  def _SendResponse(self):
-    """Sends response to the client.
+  def _PrepareMessage(self):
+    """Prepares the HTTP message by setting mandatory headers.
 
     """
-    # Check whether client is still there
-    self.rfile.read(0)
-
-    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
-                 self.request_requestline, self.response_code)
+    # RFC2616, section 4.3: "The presence of a message-body in a request is
+    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    # field in the request's message-headers."
+    if self._msg.body:
+      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
 
-    if self.response_code in self.responses:
-      response_message = self.responses[self.response_code][0]
-    else:
-      response_message = ""
-
-    if self.request_version != HTTP_0_9:
-      self.wfile.write("%s %d %s\r\n" %
-                       (self.request_version, self.response_code,
-                        response_message))
-      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
-      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
-      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
-      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
-      for key, val in self.response_headers.iteritems():
-        self._SendHeader(key, val)
-
-      # We don't support keep-alive at this time
-      self._SendHeader(HTTP_CONNECTION, "close")
-      self.wfile.write("\r\n")
-
-    if (self.request_method != HTTP_HEAD and
-        self.response_code >= HTTP_OK and
-        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
-      self.wfile.write(self.response_body)
-
-  def _SendHeader(self, name, value):
-    if self.request_version != HTTP_0_9:
-      self.wfile.write("%s: %s\r\n" % (name, value))
-
-  def _ReadRequest(self):
-    """Reads and parses request line
+  def _FormatMessage(self):
+    """Serializes the HTTP message into a string.
 
     """
-    raw_requestline = self.rfile.readline()
-
-    requestline = raw_requestline
-    if requestline[-2:] == '\r\n':
-      requestline = requestline[:-2]
-    elif requestline[-1:] == '\n':
-      requestline = requestline[:-1]
-
-    if not requestline:
-      raise HTTPBadRequest("Empty request line")
-
-    self.request_requestline = requestline
-
-    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
+    buf = StringIO()
 
-    words = requestline.split()
+    # Add start line
+    buf.write(str(self._msg.start_line))
+    buf.write("\r\n")
 
-    if len(words) == 3:
-      [method, path, version] = words
-      if version[:5] != 'HTTP/':
-        raise HTTPBadRequest("Bad request version (%r)" % version)
+    # Add headers
+    if self._msg.start_line.version != HTTP_0_9:
+      for name, value in self._msg.headers.iteritems():
+        buf.write("%s: %s\r\n" % (name, value))
 
-      try:
-        base_version_number = version.split('/', 1)[1]
-        version_number = base_version_number.split(".")
-
-        # RFC 2145 section 3.1 says there can be only one "." and
-        #   - major and minor numbers MUST be treated as
-        #      separate integers;
-        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
-        #      turn is lower than HTTP/12.3;
-        #   - Leading zeros MUST be ignored by recipients.
-        if len(version_number) != 2:
-          raise HTTPBadRequest("Bad request version (%r)" % version)
-
-        version_number = int(version_number[0]), int(version_number[1])
-      except (ValueError, IndexError):
-        raise HTTPBadRequest("Bad request version (%r)" % version)
-
-      if version_number >= (2, 0):
-        raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
-                                      base_version_number)
-
-    elif len(words) == 2:
-      version = HTTP_0_9
-      [method, path] = words
-      if method != HTTP_GET:
-        raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
+    buf.write("\r\n")
 
-    else:
-      raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
+    # Add message body if needed
+    if self.HasMessageBody():
+      buf.write(self._msg.body)
 
-    # Examine the headers and look for a Connection directive
-    headers = mimetools.Message(self.rfile, 0)
+    elif self._msg.body:
+      logging.warning("Ignoring message body")
 
-    self.request_method = method
-    self.request_path = path
-    self.request_version = version
-    self.request_headers = headers
+    return buf.getvalue()
 
-  def _ReadPostData(self):
-    """Reads POST/PUT data
+  def HasMessageBody(self):
+    """Checks whether the HTTP message contains a body.
 
-    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
-    a request is signaled by the inclusion of a Content-Length header field in
-    the request message headers. HTTP/1.0 requests containing an entity body
-    must include a valid Content-Length header field."
+    Can be overriden by subclasses.
 
     """
-    # While not according to specification, we only support an entity body for
-    # POST and PUT.
-    if (not self.request_method or
-        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
-      self.request_post_data = None
-      return
-
-    content_length = None
-    try:
-      if HTTP_CONTENT_LENGTH in self.request_headers:
-        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
-    except TypeError:
-      pass
-    except ValueError:
-      pass
+    return bool(self._msg.body)
 
-    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
-    if content_length is None:
-      raise HTTPLengthRequired("Missing Content-Length header or"
-                               " invalid format")
 
-    data = self.rfile.read(content_length)
-
-    # TODO: Content-type, error handling
-    if data:
-      self.request_post_data = HTTPJsonConverter().Decode(data)
-    else:
-      self.request_post_data = None
-
-    logging.debug("HTTP POST data: %s", self.request_post_data)
-
-
-class HttpServer(HttpSocketBase):
-  """Generic HTTP server class
-
-  Users of this class must subclass it and override the HandleRequest function.
+class HttpMessageReader(object):
+  """Reads HTTP message from socket.
 
   """
-  MAX_CHILDREN = 20
-
-  def __init__(self, mainloop, local_address, port,
-               ssl_params=None, ssl_verify_peer=False):
-    """Initializes the HTTP server
-
-    @type mainloop: ganeti.daemon.Mainloop
-    @param mainloop: Mainloop used to poll for I/O events
-    @type local_addess: string
-    @param local_address: Local IP address to bind to
-    @type port: int
-    @param port: TCP port to listen on
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to require client certificate and compare
-                            it with our certificate
-
-    """
-    HttpSocketBase.__init__(self)
-
-    self.mainloop = mainloop
-    self.local_address = local_address
-    self.port = port
-
-    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
-
-    # Allow port to be reused
-    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    if self._using_ssl:
-      self._fileio_class = _SSLFileObject
-    else:
-      self._fileio_class = socket._fileobject
-
-    self._children = []
-
-    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
-    mainloop.RegisterSignal(self)
-
-  def Start(self):
-    self.socket.bind((self.local_address, self.port))
-    self.socket.listen(5)
-
-  def Stop(self):
-    self.socket.close()
-
-  def OnIO(self, fd, condition):
-    if condition & select.POLLIN:
-      self._IncomingConnection()
-
-  def OnSignal(self, signum):
-    if signum == signal.SIGCHLD:
-      self._CollectChildren(True)
-
-  def _CollectChildren(self, quick):
-    """Checks whether any child processes are done
-
-    @type quick: bool
-    @param quick: Whether to only use non-blocking functions
-
-    """
-    if not quick:
-      # Don't wait for other processes if it should be a quick check
-      while len(self._children) > self.MAX_CHILDREN:
-        try:
-          # Waiting without a timeout brings us into a potential DoS situation.
-          # As soon as too many children run, we'll not respond to new
-          # requests. The real solution would be to add a timeout for children
-          # and killing them after some time.
-          pid, status = os.waitpid(0, 0)
-        except os.error:
-          pid = None
-        if pid and pid in self._children:
-          self._children.remove(pid)
-
-    for child in self._children:
-      try:
-        pid, status = os.waitpid(child, os.WNOHANG)
-      except os.error:
-        pid = None
-      if pid and pid in self._children:
-        self._children.remove(pid)
-
-  def _IncomingConnection(self):
-    """Called for each incoming connection
-
-    """
-    (connection, client_addr) = self.socket.accept()
-
-    self._CollectChildren(False)
-
-    pid = os.fork()
-    if pid == 0:
-      # Child process
-      try:
-        HttpServerRequestExecutor(self, connection, client_addr,
-                                  self._fileio_class)
-      except:
-        logging.exception("Error while handling request from %s:%s",
-                          client_addr[0], client_addr[1])
-        os._exit(1)
-      os._exit(0)
-    else:
-      self._children.append(pid)
-
-  def HandleRequest(self, req):
-    raise NotImplementedError()
-
-
-class HttpClientRequest(object):
-  def __init__(self, host, port, method, path, headers=None, post_data=None,
-               ssl_params=None, ssl_verify_peer=False):
-    """Describes an HTTP request.
-
-    @type host: string
-    @param host: Hostname
-    @type port: int
-    @param port: Port
-    @type method: string
-    @param method: Method name
-    @type path: string
-    @param path: Request path
-    @type headers: dict or None
-    @param headers: Additional headers to send
-    @type post_data: string or None
-    @param post_data: Additional data to send
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to compare our certificate with server's
-                            certificate
-
-    """
-    if post_data is not None:
-      assert method.upper() in (HTTP_POST, HTTP_PUT), \
-        "Only POST and GET requests support sending data"
-
-    assert path.startswith("/"), "Path must start with slash (/)"
-
-    self.host = host
-    self.port = port
-    self.ssl_params = ssl_params
-    self.ssl_verify_peer = ssl_verify_peer
-    self.method = method
-    self.path = path
-    self.headers = headers
-    self.post_data = post_data
-
-    self.success = None
-    self.error = None
-
-    self.resp_status_line = None
-    self.resp_version = None
-    self.resp_status = None
-    self.resp_reason = None
-    self.resp_headers = None
-    self.resp_body = None
-
-
-class HttpClientRequestExecutor(HttpSocketBase):
-  # Default headers
-  DEFAULT_HEADERS = {
-    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
-    # TODO: For keep-alive, don't send "Connection: close"
-    HTTP_CONNECTION: "close",
-    }
-
   # Length limits
-  STATUS_LINE_LENGTH_MAX = 512
-  HEADER_LENGTH_MAX = 4 * 1024
-
-  # Timeouts in seconds for socket layer
-  # TODO: Make read timeout configurable per OpCode
-  CONNECT_TIMEOUT = 5.0
-  WRITE_TIMEOUT = 10
-  READ_TIMEOUT = None
-  CLOSE_TIMEOUT = 1
+  START_LINE_LENGTH_MAX = None
+  HEADER_LENGTH_MAX = None
 
   # Parser state machine
-  PS_STATUS_LINE = "status-line"
+  PS_START_LINE = "start-line"
   PS_HEADERS = "headers"
-  PS_BODY = "body"
+  PS_BODY = "entity-body"
   PS_COMPLETE = "complete"
 
-  def __init__(self, req):
-    """Initializes the HttpClientRequestExecutor class.
+  def __init__(self, sock, msg, read_timeout):
+    """Reads an HTTP message from a socket.
 
-    @type req: HttpClientRequest
-    @param req: Request object
+    @type sock: socket
+    @param sock: Socket to be read from
+    @type msg: http.HttpMessage
+    @param msg: Object for the read message
+    @type read_timeout: float
+    @param read_timeout: Read timeout for socket
 
     """
-    HttpSocketBase.__init__(self)
+    self.sock = sock
+    self.msg = msg
 
-    self.request = req
-
-    self.parser_status = self.PS_STATUS_LINE
+    self.start_line_buffer = None
     self.header_buffer = StringIO()
     self.body_buffer = StringIO()
+    self.parser_status = self.PS_START_LINE
     self.content_length = None
-    self.server_will_close = None
-
-    self.poller = select.poll()
-
-    try:
-      # TODO: Implement connection caching/keep-alive
-      self.sock = self._CreateSocket(req.ssl_params,
-                                     req.ssl_verify_peer)
-
-      # Disable Python's timeout
-      self.sock.settimeout(None)
-
-      # Operate in non-blocking mode
-      self.sock.setblocking(0)
-
-      force_close = True
-      self._Connect()
-      try:
-        self._SendRequest()
-        self._ReadResponse()
-
-        # Only wait for server to close if we didn't have any exception.
-        force_close = False
-      finally:
-        self._CloseConnection(force_close)
-
-      self.sock.close()
-      self.sock = None
-
-      req.resp_body = self.body_buffer.getvalue()
-
-      req.success = True
-      req.error = None
-
-    except _HttpClientError, err:
-      req.success = False
-      req.error = str(err)
-
-  def _BuildRequest(self):
-    """Build HTTP request.
-
-    @rtype: string
-    @return: Complete request
-
-    """
-    # Headers
-    send_headers = self.DEFAULT_HEADERS.copy()
-
-    if self.request.headers:
-      send_headers.update(self.request.headers)
-
-    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
-
-    if self.request.post_data:
-      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
-
-    buf = StringIO()
-
-    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
-    # keep-alive).
-    # TODO: For keep-alive, change to HTTP/1.1
-    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
-                                self.request.path, HTTP_1_0))
-
-    # Add headers
-    for name, value in send_headers.iteritems():
-      buf.write("%s: %s\r\n" % (name, value))
+    self.peer_will_close = None
 
-    buf.write("\r\n")
-
-    if self.request.post_data:
-      buf.write(self.request.post_data)
-
-    return buf.getvalue()
-
-  def _ParseStatusLine(self):
-    """Parses the status line sent by the server.
-
-    """
-    line = self.request.resp_status_line
-
-    if not line:
-      raise _HttpClientError("Empty status line")
-
-    try:
-      [version, status, reason] = line.split(None, 2)
-    except ValueError:
-      try:
-        [version, status] = line.split(None, 1)
-        reason = ""
-      except ValueError:
-        version = HTTP_9_0
-
-    if version:
-      version = version.upper()
-
-    if version not in (HTTP_1_0, HTTP_1_1):
-      # We do not support HTTP/0.9, despite the specification requiring it
-      # (RFC2616, section 19.6)
-      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
-                             line)
-
-    # The status code is a three-digit number
-    try:
-      status = int(status)
-      if status < 100 or status > 999:
-        status = -1
-    except ValueError:
-      status = -1
-
-    if status == -1:
-      raise _HttpClientError("Invalid status code (%r)" % line)
-
-    self.request.resp_version = version
-    self.request.resp_status = status
-    self.request.resp_reason = reason
-
-  def _WillServerCloseConnection(self):
-    """Evaluate whether server will close the connection.
-
-    @rtype: bool
-    @return: Whether server will close the connection
-
-    """
-    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
-    if hdr_connection:
-      hdr_connection = hdr_connection.lower()
-
-    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
-    if self.request.resp_version == HTTP_1_1:
-      return (hdr_connection and "close" in hdr_connection)
-
-    # Some HTTP/1.0 implementations have support for persistent connections,
-    # using rules different than HTTP/1.1.
-
-    # For older HTTP, Keep-Alive indicates persistent connection.
-    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
-      return False
-
-    # At least Akamai returns a "Connection: Keep-Alive" header, which was
-    # supposed to be sent by the client.
-    if hdr_connection and "keep-alive" in hdr_connection:
-      return False
-
-    return True
-
-  def _ParseHeaders(self):
-    """Parses the headers sent by the server.
-
-    This function also adjusts internal variables based on the header values.
+    buf = ""
+    eof = False
+    while self.parser_status != self.PS_COMPLETE:
+      # TODO: Don't read more than necessary (Content-Length), otherwise
+      # data might be lost and/or an error could occur
+      data = SocketOperation(sock, SOCKOP_RECV, SOCK_BUF_SIZE, read_timeout)
 
-    """
-    req = self.request
+      if data:
+        buf += data
+      else:
+        eof = True
 
-    # Parse headers
-    self.header_buffer.seek(0, 0)
-    req.resp_headers = mimetools.Message(self.header_buffer, 0)
+      # Do some parsing and error checking while more data arrives
+      buf = self._ContinueParsing(buf, eof)
 
-    self.server_will_close = self._WillServerCloseConnection()
+      # Must be done only after the buffer has been evaluated
+      # TODO: Connection-length < len(data read) and connection closed
+      if (eof and
+          self.parser_status in (self.PS_START_LINE,
+                                 self.PS_HEADERS)):
+        raise HttpError("Connection closed prematurely")
 
-    # Do we have a Content-Length header?
-    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
-    if hdr_content_length:
-      try:
-        self.content_length = int(hdr_content_length)
-      except ValueError:
-        pass
-      if self.content_length is not None and self.content_length < 0:
-        self.content_length = None
+    # Parse rest
+    buf = self._ContinueParsing(buf, True)
 
-    # does the body have a fixed length? (of zero)
-    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
-        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
-      self.content_length = 0
+    assert self.parser_status == self.PS_COMPLETE
+    assert not buf, "Parser didn't read full response"
 
-    # if the connection remains open and a content-length was not provided,
-    # then assume that the connection WILL close.
-    if self.content_length is None:
-      self.server_will_close = True
+    msg.body = self.body_buffer.getvalue()
 
-  def _CheckStatusLineLength(self, length):
-    if length > self.STATUS_LINE_LENGTH_MAX:
-      raise _HttpClientError("Status line longer than %d chars" %
-                             self.STATUS_LINE_LENGTH_MAX)
+    # TODO: Content-type, error handling
+    if msg.body:
+      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
+    else:
+      msg.decoded_body = None
 
-  def _CheckHeaderLength(self, length):
-    if length > self.HEADER_LENGTH_MAX:
-      raise _HttpClientError("Headers longer than %d chars" %
-                             self.HEADER_LENGTH_MAX)
+    if msg.decoded_body:
+      logging.debug("Message body: %s", msg.decoded_body)
 
-  def _ParseBuffer(self, buf, eof):
-    """Main function for HTTP response state machine.
+  def _ContinueParsing(self, buf, eof):
+    """Main function for HTTP message state machine.
 
     @type buf: string
     @param buf: Receive buffer
@@ -1115,25 +811,41 @@ class HttpClientRequestExecutor(HttpSocketBase):
     @return: Updated receive buffer
 
     """
-    if self.parser_status == self.PS_STATUS_LINE:
-      # Expect status line
-      idx = buf.find("\r\n")
-      if idx >= 0:
-        self.request.resp_status_line = buf[:idx]
-
-        self._CheckStatusLineLength(len(self.request.resp_status_line))
+    # TODO: Use offset instead of slicing when possible
+    if self.parser_status == self.PS_START_LINE:
+      # Expect start line
+      while True:
+        idx = buf.find("\r\n")
+
+        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
+        # ignore any empty line(s) received where a Request-Line is expected.
+        # In other words, if the server is reading the protocol stream at the
+        # beginning of a message and receives a CRLF first, it should ignore
+        # the CRLF."
+        if idx == 0:
+          # TODO: Limit number of CRLFs/empty lines for safety?
+          buf = buf[:2]
+          continue
+
+        if idx > 0:
+          self.start_line_buffer = buf[:idx]
+
+          self._CheckStartLineLength(len(self.start_line_buffer))
+
+          # Remove status line, including CRLF
+          buf = buf[idx + 2:]
+
+          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
+
+          self.parser_status = self.PS_HEADERS
+        else:
+          # Check whether incoming data is getting too large, otherwise we just
+          # fill our read buffer.
+          self._CheckStartLineLength(len(buf))
 
-        # Remove status line, including CRLF
-        buf = buf[idx + 2:]
-
-        self._ParseStatusLine()
-
-        self.parser_status = self.PS_HEADERS
-      else:
-        # Check whether incoming data is getting too large, otherwise we just
-        # fill our read buffer.
-        self._CheckStatusLineLength(len(buf))
+        break
 
+    # TODO: Handle messages without headers
     if self.parser_status == self.PS_HEADERS:
       # Wait for header end
       idx = buf.find("\r\n\r\n")
@@ -1154,324 +866,127 @@ class HttpClientRequestExecutor(HttpSocketBase):
         self._CheckHeaderLength(len(buf))
 
     if self.parser_status == self.PS_BODY:
+      # TODO: Implement max size for body_buffer
       self.body_buffer.write(buf)
       buf = ""
 
       # Check whether we've read everything
+      #
+      # RFC2616, section 4.4: "When a message-body is included with a message,
+      # the transfer-length of that body is determined by one of the following
+      # [...] 5. By the server closing the connection. (Closing the connection
+      # cannot be used to indicate the end of a request body, since that would
+      # leave no possibility for the server to send back a response.)"
+      #
+      # TODO: Error when buffer length > Content-Length header
       if (eof or
+          self.content_length is None or
           (self.content_length is not None and
            self.body_buffer.tell() >= self.content_length)):
         self.parser_status = self.PS_COMPLETE
 
     return buf
 
-  def _Connect(self):
-    """Non-blocking connect to host with timeout.
-
-    """
-    connected = False
-    while True:
-      try:
-        connect_error = self.sock.connect_ex((self.request.host,
-                                              self.request.port))
-      except socket.gaierror, err:
-        raise _HttpClientError("Connection failed: %s" % str(err))
-
-      if connect_error == errno.EINTR:
-        # Mask signals
-        pass
-
-      elif connect_error == 0:
-        # Connection established
-        connected = True
-        break
-
-      elif connect_error == errno.EINPROGRESS:
-        # Connection started
-        break
-
-      raise _HttpClientError("Connection failed (%s: %s)" %
-                             (connect_error, os.strerror(connect_error)))
-
-    if not connected:
-      # Wait for connection
-      event = WaitForSocketCondition(self.poller, self.sock,
-                                     select.POLLOUT, self.CONNECT_TIMEOUT)
-      if event is None:
-        raise _HttpClientError("Timeout while connecting to server")
-
-      # Get error code
-      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-      if connect_error != 0:
-        raise _HttpClientError("Connection failed (%s: %s)" %
-                               (connect_error, os.strerror(connect_error)))
-
-    # Enable TCP keep-alive
-    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+  def _CheckStartLineLength(self, length):
+    """Limits the start line buffer size.
 
-    # If needed, Linux specific options are available to change the TCP
-    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
-    # TCP_KEEPINTVL.
-
-  def _SendRequest(self):
-    """Sends request to server.
+    @type length: int
+    @param length: Buffer size
 
     """
-    buf = self._BuildRequest()
-
-    while buf:
-      # Send only 4 KB at a time
-      data = buf[:4096]
+    if (self.START_LINE_LENGTH_MAX is not None and
+        length > self.START_LINE_LENGTH_MAX):
+      raise HttpError("Start line longer than %d chars" %
+                       self.START_LINE_LENGTH_MAX)
 
-      try:
-        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
-                               self.WRITE_TIMEOUT)
-      except HttpSocketTimeout:
-        raise _HttpClientError("Timeout while sending request")
-      except socket.error, err:
-        raise _HttpClientError("Error sending request: %s" % err)
-
-      # Remove sent bytes
-      buf = buf[sent:]
-
-    assert not buf, "Request wasn't sent completely"
-
-  def _ReadResponse(self):
-    """Read response from server.
+  def _CheckHeaderLength(self, length):
+    """Limits the header buffer size.
 
-    Calls the parser function after reading a chunk of data.
+    @type length: int
+    @param length: Buffer size
 
     """
-    buf = ""
-    eof = False
-    while self.parser_status != self.PS_COMPLETE:
-      try:
-        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
-                               self.READ_TIMEOUT)
-      except HttpSocketTimeout:
-        raise _HttpClientError("Timeout while reading response")
-      except socket.error, err:
-        raise _HttpClientError("Error while reading response: %s" % err)
-
-      if data:
-        buf += data
-      else:
-        eof = True
-
-      # Do some parsing and error checking while more data arrives
-      buf = self._ParseBuffer(buf, eof)
+    if (self.HEADER_LENGTH_MAX is not None and
+        length > self.HEADER_LENGTH_MAX):
+      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
 
-      # Must be done only after the buffer has been evaluated
-      if (eof and
-          self.parser_status in (self.PS_STATUS_LINE,
-                                 self.PS_HEADERS)):
-        raise _HttpClientError("Connection closed prematurely")
+  def ParseStartLine(self, start_line):
+    """Parses the start line of a message.
 
-    # Parse rest
-    buf = self._ParseBuffer(buf, True)
+    Must be overriden by subclass.
 
-    assert self.parser_status == self.PS_COMPLETE
-    assert not buf, "Parser didn't read full response"
-
-  def _CloseConnection(self, force):
-    """Closes the connection.
+    @type start_line: string
+    @param start_line: Start line string
 
     """
-    if self.server_will_close and not force:
-      # Wait for server to close
-      try:
-        # Check whether it's actually closed
-        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
-                               self.CLOSE_TIMEOUT):
-          return
-      except (socket.error, _HttpClientError, HttpSocketTimeout):
-        # Ignore errors at this stage
-        pass
-
-    # Close the connection from our side
-    try:
-      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
-                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
-    except HttpSocketTimeout:
-      raise _HttpClientError("Timeout while shutting down connection")
-    except socket.error, err:
-      raise _HttpClientError("Error while shutting down connection: %s" % err)
-
-
-class _HttpClientPendingRequest(object):
-  """Data class for pending requests.
-
-  """
-  def __init__(self, request):
-    self.request = request
-
-    # Thread synchronization
-    self.done = threading.Event()
-
-
-class HttpClientWorker(workerpool.BaseWorker):
-  """HTTP client worker class.
-
-  """
-  def RunTask(self, pend_req):
-    try:
-      HttpClientRequestExecutor(pend_req.request)
-    finally:
-      pend_req.done.set()
-
-
-class HttpClientWorkerPool(workerpool.WorkerPool):
-  def __init__(self, manager):
-    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
-                                   HttpClientWorker)
-    self.manager = manager
-
-
-class HttpClientManager(object):
-  """Manages HTTP requests.
-
-  """
-  def __init__(self):
-    self._wpool = HttpClientWorkerPool(self)
-
-  def __del__(self):
-    self.Shutdown()
-
-  def ExecRequests(self, requests):
-    """Execute HTTP requests.
+    raise NotImplementedError()
 
-    This function can be called from multiple threads at the same time.
+  def _WillPeerCloseConnection(self):
+    """Evaluate whether peer will close the connection.
 
-    @type requests: List of HttpClientRequest instances
-    @param requests: The requests to execute
-    @rtype: List of HttpClientRequest instances
-    @returns: The list of requests passed in
+    @rtype: bool
+    @return: Whether peer will close the connection
 
     """
-    # _HttpClientPendingRequest is used for internal thread synchronization
-    pending = [_HttpClientPendingRequest(req) for req in requests]
-
-    try:
-      # Add requests to queue
-      for pend_req in pending:
-        self._wpool.AddTask(pend_req)
-
-    finally:
-      # In case of an exception we should still wait for the rest, otherwise
-      # another thread from the worker pool could modify the request object
-      # after we returned.
-
-      # And wait for them to finish
-      for pend_req in pending:
-        pend_req.done.wait()
-
-    # Return original list
-    return requests
-
-  def Shutdown(self):
-    self._wpool.Quiesce()
-    self._wpool.TerminateWorkers()
-
-
-class _SSLFileObject(object):
-  """Wrapper around socket._fileobject
-
-  This wrapper is required to handle OpenSSL exceptions.
-
-  """
-  def _RequireOpenSocket(fn):
-    def wrapper(self, *args, **kwargs):
-      if self.closed:
-        raise SocketClosed("Socket is closed")
-      return fn(self, *args, **kwargs)
-    return wrapper
-
-  def __init__(self, sock, mode='rb', bufsize=-1):
-    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
-
-  def _ConnectionLost(self):
-    self._base = None
-
-  def _getclosed(self):
-    return self._base is None or self._base.closed
-  closed = property(_getclosed, doc="True if the file is closed")
-
-  @_RequireOpenSocket
-  def close(self):
-    return self._base.close()
-
-  @_RequireOpenSocket
-  def flush(self):
-    return self._base.flush()
-
-  @_RequireOpenSocket
-  def fileno(self):
-    return self._base.fileno()
+    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
+    # for the sender to signal that the connection will be closed after
+    # completion of the response. For example,
+    #
+    #        Connection: close
+    #
+    # in either the request or the response header fields indicates that the
+    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
+    # current request/response is complete."
+
+    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
+    if hdr_connection:
+      hdr_connection = hdr_connection.lower()
 
-  @_RequireOpenSocket
-  def read(self, size=-1):
-    return self._ReadWrapper(self._base.read, size=size)
+    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
+    if self.msg.start_line.version == HTTP_1_1:
+      return (hdr_connection and "close" in hdr_connection)
 
-  @_RequireOpenSocket
-  def readline(self, size=-1):
-    return self._ReadWrapper(self._base.readline, size=size)
+    # Some HTTP/1.0 implementations have support for persistent connections,
+    # using rules different than HTTP/1.1.
 
-  def _ReadWrapper(self, fn, *args, **kwargs):
-    while True:
-      try:
-        return fn(*args, **kwargs)
+    # For older HTTP, Keep-Alive indicates persistent connection.
+    if self.msg.headers.get(HTTP_KEEP_ALIVE):
+      return False
 
-      except OpenSSL.SSL.ZeroReturnError, err:
-        self._ConnectionLost()
-        return ""
+    # At least Akamai returns a "Connection: Keep-Alive" header, which was
+    # supposed to be sent by the client.
+    if hdr_connection and "keep-alive" in hdr_connection:
+      return False
 
-      except OpenSSL.SSL.WantReadError:
-        continue
+    return True
 
-      #except OpenSSL.SSL.WantWriteError:
-      # TODO
+  def _ParseHeaders(self):
+    """Parses the headers.
 
-      except OpenSSL.SSL.SysCallError, (retval, desc):
-        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
-            or retval > 0):
-          self._ConnectionLost()
-          return ""
+    This function also adjusts internal variables based on header values.
 
-        logging.exception("Error in OpenSSL")
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    RFC2616, section 4.3: The presence of a message-body in a request is
+    signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    field in the request's message-headers.
 
-      except OpenSSL.SSL.Error, err:
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    """
+    # Parse headers
+    self.header_buffer.seek(0, 0)
+    self.msg.headers = mimetools.Message(self.header_buffer, 0)
 
-  @_RequireOpenSocket
-  def write(self, data):
-    return self._WriteWrapper(self._base.write, data)
+    self.peer_will_close = self._WillPeerCloseConnection()
 
-  def _WriteWrapper(self, fn, *args, **kwargs):
-    while True:
+    # Do we have a Content-Length header?
+    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
+    if hdr_content_length:
       try:
-        return fn(*args, **kwargs)
-      except OpenSSL.SSL.ZeroReturnError, err:
-        self._ConnectionLost()
-        return 0
-
-      except OpenSSL.SSL.WantWriteError:
-        continue
-
-      #except OpenSSL.SSL.WantReadError:
-      # TODO
-
-      except OpenSSL.SSL.SysCallError, err:
-        if err.args[0] == -1 and data == "":
-          # errors when writing empty strings are expected
-          # and can be ignored
-          return 0
-
-        self._ConnectionLost()
-        raise socket.error(err.args)
+        self.content_length = int(hdr_content_length)
+      except ValueError:
+        self.content_length = None
+      if self.content_length is not None and self.content_length < 0:
+        self.content_length = None
 
-      except OpenSSL.SSL.Error, err:
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    # if the connection remains open and a content-length was not provided,
+    # then assume that the connection WILL close.
+    if self.content_length is None:
+      self.peer_will_close = True