Add targetted pylint disables
[ganeti-local] / lib / http / __init__.py
index b4f8c3b..c98fa58 100644 (file)
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 # 02110-1301, USA.
 
-"""HTTP server module.
+"""HTTP module.
 
 """
 
 
 """
 
-import BaseHTTPServer
-import cgi
 import logging
 import mimetools
 import OpenSSL
 import logging
 import mimetools
 import OpenSSL
-import os
 import select
 import socket
 import select
 import socket
-import sys
-import time
-import signal
-import logging
 import errno
 import errno
-import threading
 
 from cStringIO import StringIO
 
 from ganeti import constants
 from ganeti import serializer
 
 from cStringIO import StringIO
 
 from ganeti import constants
 from ganeti import serializer
-from ganeti import workerpool
 from ganeti import utils
 
 
 from ganeti import utils
 
 
-HTTP_CLIENT_THREADS = 10
-
 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
 
 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
 
-WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
-MONTHNAME = [None,
-             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
-             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
-
-# Default error message
-DEFAULT_ERROR_CONTENT_TYPE = "text/html"
-DEFAULT_ERROR_MESSAGE = """\
-<head>
-<title>Error response</title>
-</head>
-<body>
-<h1>Error response</h1>
-<p>Error code %(code)d.
-<p>Message: %(message)s.
-<p>Error code explanation: %(code)s = %(explain)s.
-</body>
-"""
-
 HTTP_OK = 200
 HTTP_NO_CONTENT = 204
 HTTP_NOT_MODIFIED = 304
 HTTP_OK = 200
 HTTP_NO_CONTENT = 204
 HTTP_NOT_MODIFIED = 304
@@ -80,6 +50,7 @@ HTTP_GET = "GET"
 HTTP_HEAD = "HEAD"
 HTTP_POST = "POST"
 HTTP_PUT = "PUT"
 HTTP_HEAD = "HEAD"
 HTTP_POST = "POST"
 HTTP_PUT = "PUT"
+HTTP_DELETE = "DELETE"
 
 HTTP_ETAG = "ETag"
 HTTP_HOST = "Host"
 
 HTTP_ETAG = "ETag"
 HTTP_HOST = "Host"
@@ -90,21 +61,42 @@ HTTP_CONTENT_TYPE = "Content-Type"
 HTTP_CONTENT_LENGTH = "Content-Length"
 HTTP_CONNECTION = "Connection"
 HTTP_KEEP_ALIVE = "Keep-Alive"
 HTTP_CONTENT_LENGTH = "Content-Length"
 HTTP_CONNECTION = "Connection"
 HTTP_KEEP_ALIVE = "Keep-Alive"
+HTTP_WWW_AUTHENTICATE = "WWW-Authenticate"
+HTTP_AUTHORIZATION = "Authorization"
+HTTP_AUTHENTICATION_INFO = "Authentication-Info"
+HTTP_ALLOW = "Allow"
 
 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
 
 # Socket operations
 (SOCKOP_SEND,
  SOCKOP_RECV,
 
 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
 
 # Socket operations
 (SOCKOP_SEND,
  SOCKOP_RECV,
- SOCKOP_SHUTDOWN) = range(3)
+ SOCKOP_SHUTDOWN,
+ SOCKOP_HANDSHAKE) = range(4)
+
+# send/receive quantum
+SOCK_BUF_SIZE = 32768
+
+
+class HttpError(Exception):
+  """Internal exception for HTTP errors.
+
+  This should only be used for internal error reporting.
+
+  """
 
 
 
 
-class SocketClosed(socket.error):
-  pass
+class HttpConnectionClosed(Exception):
+  """Internal exception for a closed connection.
+
+  This should only be used for internal error reporting. Only use
+  it if there's no other way to report this condition.
+
+  """
 
 
 
 
-class _HttpClientError(Exception):
-  """Internal exception for HTTP client errors.
+class HttpSessionHandshakeUnexpectedEOF(HttpError):
+  """Internal exception for errors during SSL handshake.
 
   This should only be used for internal error reporting.
 
 
   This should only be used for internal error reporting.
 
@@ -123,45 +115,187 @@ class HttpException(Exception):
   code = None
   message = None
 
   code = None
   message = None
 
-  def __init__(self, message=None):
+  def __init__(self, message=None, headers=None):
     Exception.__init__(self)
     Exception.__init__(self)
-    if message is not None:
-      self.message = message
+    self.message = message
+    self.headers = headers
 
 
 class HttpBadRequest(HttpException):
 
 
 class HttpBadRequest(HttpException):
+  """400 Bad Request
+
+  RFC2616, 10.4.1: The request could not be understood by the server
+  due to malformed syntax. The client SHOULD NOT repeat the request
+  without modifications.
+
+  """
   code = 400
 
 
   code = 400
 
 
+class HttpUnauthorized(HttpException):
+  """401 Unauthorized
+
+  RFC2616, section 10.4.2: The request requires user
+  authentication. The response MUST include a WWW-Authenticate header
+  field (section 14.47) containing a challenge applicable to the
+  requested resource.
+
+  """
+  code = 401
+
+
 class HttpForbidden(HttpException):
 class HttpForbidden(HttpException):
+  """403 Forbidden
+
+  RFC2616, 10.4.4: The server understood the request, but is refusing
+  to fulfill it.  Authorization will not help and the request SHOULD
+  NOT be repeated.
+
+  """
   code = 403
 
 
 class HttpNotFound(HttpException):
   code = 403
 
 
 class HttpNotFound(HttpException):
+  """404 Not Found
+
+  RFC2616, 10.4.5: The server has not found anything matching the
+  Request-URI.  No indication is given of whether the condition is
+  temporary or permanent.
+
+  """
   code = 404
 
 
   code = 404
 
 
+class HttpMethodNotAllowed(HttpException):
+  """405 Method Not Allowed
+
+  RFC2616, 10.4.6: The method specified in the Request-Line is not
+  allowed for the resource identified by the Request-URI. The response
+  MUST include an Allow header containing a list of valid methods for
+  the requested resource.
+
+  """
+  code = 405
+
+
+class HttpRequestTimeout(HttpException):
+  """408 Request Timeout
+
+  RFC2616, 10.4.9: The client did not produce a request within the
+  time that the server was prepared to wait. The client MAY repeat the
+  request without modifications at any later time.
+
+  """
+  code = 408
+
+
+class HttpConflict(HttpException):
+  """409 Conflict
+
+  RFC2616, 10.4.10: The request could not be completed due to a
+  conflict with the current state of the resource. This code is only
+  allowed in situations where it is expected that the user might be
+  able to resolve the conflict and resubmit the request.
+
+  """
+  code = 409
+
+
 class HttpGone(HttpException):
 class HttpGone(HttpException):
+  """410 Gone
+
+  RFC2616, 10.4.11: The requested resource is no longer available at
+  the server and no forwarding address is known. This condition is
+  expected to be considered permanent.
+
+  """
   code = 410
 
 
 class HttpLengthRequired(HttpException):
   code = 410
 
 
 class HttpLengthRequired(HttpException):
+  """411 Length Required
+
+  RFC2616, 10.4.12: The server refuses to accept the request without a
+  defined Content-Length. The client MAY repeat the request if it adds
+  a valid Content-Length header field containing the length of the
+  message-body in the request message.
+
+  """
   code = 411
 
 
   code = 411
 
 
-class HttpInternalError(HttpException):
+class HttpPreconditionFailed(HttpException):
+  """412 Precondition Failed
+
+  RFC2616, 10.4.13: The precondition given in one or more of the
+  request-header fields evaluated to false when it was tested on the
+  server.
+
+  """
+  code = 412
+
+
+class HttpInternalServerError(HttpException):
+  """500 Internal Server Error
+
+  RFC2616, 10.5.1: The server encountered an unexpected condition
+  which prevented it from fulfilling the request.
+
+  """
   code = 500
 
 
 class HttpNotImplemented(HttpException):
   code = 500
 
 
 class HttpNotImplemented(HttpException):
+  """501 Not Implemented
+
+  RFC2616, 10.5.2: The server does not support the functionality
+  required to fulfill the request.
+
+  """
   code = 501
 
 
   code = 501
 
 
+class HttpBadGateway(HttpException):
+  """502 Bad Gateway
+
+  RFC2616, 10.5.3: The server, while acting as a gateway or proxy,
+  received an invalid response from the upstream server it accessed in
+  attempting to fulfill the request.
+
+  """
+  code = 502
+
+
 class HttpServiceUnavailable(HttpException):
 class HttpServiceUnavailable(HttpException):
+  """503 Service Unavailable
+
+  RFC2616, 10.5.4: The server is currently unable to handle the
+  request due to a temporary overloading or maintenance of the server.
+
+  """
   code = 503
 
 
   code = 503
 
 
+class HttpGatewayTimeout(HttpException):
+  """504 Gateway Timeout
+
+  RFC2616, 10.5.5: The server, while acting as a gateway or proxy, did
+  not receive a timely response from the upstream server specified by
+  the URI (e.g.  HTTP, FTP, LDAP) or some other auxiliary server
+  (e.g. DNS) it needed to access in attempting to complete the
+  request.
+
+  """
+  code = 504
+
+
 class HttpVersionNotSupported(HttpException):
 class HttpVersionNotSupported(HttpException):
+  """505 HTTP Version Not Supported
+
+  RFC2616, 10.5.6: The server does not support, or refuses to support,
+  the HTTP protocol version that was used in the request message.
+
+  """
   code = 505
 
 
   code = 505
 
 
@@ -175,13 +309,11 @@ class HttpJsonConverter:
     return serializer.LoadJson(data)
 
 
     return serializer.LoadJson(data)
 
 
-def WaitForSocketCondition(poller, sock, event, timeout):
+def WaitForSocketCondition(sock, event, timeout):
   """Waits for a condition to occur on the socket.
 
   """Waits for a condition to occur on the socket.
 
-  @type poller: select.Poller
-  @param poller: Poller object as created by select.poll()
   @type sock: socket
   @type sock: socket
-  @param socket: Wait for events on this socket
+  @param sock: Wait for events on this socket
   @type event: int
   @param event: ORed condition (see select module)
   @type timeout: float or None
   @type event: int
   @param event: ORed condition (see select module)
   @type timeout: float or None
@@ -197,6 +329,7 @@ def WaitForSocketCondition(poller, sock, event, timeout):
     # Poller object expects milliseconds
     timeout *= 1000
 
     # Poller object expects milliseconds
     timeout *= 1000
 
+  poller = select.poll()
   poller.register(sock, event)
   try:
     while True:
   poller.register(sock, event)
   try:
     while True:
@@ -214,36 +347,32 @@ def WaitForSocketCondition(poller, sock, event, timeout):
     poller.unregister(sock)
 
 
     poller.unregister(sock)
 
 
-def SocketOperation(poller, sock, op, arg1, timeout):
+def SocketOperation(sock, op, arg1, timeout):
   """Wrapper around socket functions.
 
   This function abstracts error handling for socket operations, especially
   for the complicated interaction with OpenSSL.
 
   """Wrapper around socket functions.
 
   This function abstracts error handling for socket operations, especially
   for the complicated interaction with OpenSSL.
 
-  @type poller: select.Poller
-  @param poller: Poller object as created by select.poll()
   @type sock: socket
   @type sock: socket
-  @param socket: Socket for the operation
+  @param sock: Socket for the operation
   @type op: int
   @param op: Operation to execute (SOCKOP_* constants)
   @type arg1: any
   @param arg1: Parameter for function (if needed)
   @type timeout: None or float
   @param timeout: Timeout in seconds or None
   @type op: int
   @param op: Operation to execute (SOCKOP_* constants)
   @type arg1: any
   @param arg1: Parameter for function (if needed)
   @type timeout: None or float
   @param timeout: Timeout in seconds or None
+  @return: Return value of socket function
 
   """
   # TODO: event_poll/event_check/override
 
   """
   # TODO: event_poll/event_check/override
-  if op == SOCKOP_SEND:
+  if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
     event_poll = select.POLLOUT
     event_poll = select.POLLOUT
-    event_check = select.POLLOUT
 
   elif op == SOCKOP_RECV:
     event_poll = select.POLLIN
 
   elif op == SOCKOP_RECV:
     event_poll = select.POLLIN
-    event_check = select.POLLIN | select.POLLPRI
 
   elif op == SOCKOP_SHUTDOWN:
     event_poll = None
 
   elif op == SOCKOP_SHUTDOWN:
     event_poll = None
-    event_check = None
 
     # The timeout is only used when OpenSSL requests polling for a condition.
     # It is not advisable to have no timeout for shutdown.
 
     # The timeout is only used when OpenSSL requests polling for a condition.
     # It is not advisable to have no timeout for shutdown.
@@ -252,18 +381,23 @@ def SocketOperation(poller, sock, op, arg1, timeout):
   else:
     raise AssertionError("Invalid socket operation")
 
   else:
     raise AssertionError("Invalid socket operation")
 
+  # Handshake is only supported by SSL sockets
+  if (op == SOCKOP_HANDSHAKE and
+      not isinstance(sock, OpenSSL.SSL.ConnectionType)):
+    return
+
   # No override by default
   event_override = 0
 
   while True:
     # Poll only for certain operations and when asked for by an override
   # No override by default
   event_override = 0
 
   while True:
     # Poll only for certain operations and when asked for by an override
-    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
+    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV, SOCKOP_HANDSHAKE):
       if event_override:
         wait_for_event = event_override
       else:
         wait_for_event = event_poll
 
       if event_override:
         wait_for_event = event_override
       else:
         wait_for_event = event_poll
 
-      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
+      event = WaitForSocketCondition(sock, wait_for_event, timeout)
       if event is None:
         raise HttpSocketTimeout()
 
       if event is None:
         raise HttpSocketTimeout()
 
@@ -292,6 +426,9 @@ def SocketOperation(poller, sock, op, arg1, timeout):
           else:
             return sock.shutdown(arg1)
 
           else:
             return sock.shutdown(arg1)
 
+        elif op == SOCKOP_HANDSHAKE:
+          return sock.do_handshake()
+
       except OpenSSL.SSL.WantWriteError:
         # OpenSSL wants to write, poll for POLLOUT
         event_override = select.POLLOUT
       except OpenSSL.SSL.WantWriteError:
         # OpenSSL wants to write, poll for POLLOUT
         event_override = select.POLLOUT
@@ -305,6 +442,21 @@ def SocketOperation(poller, sock, op, arg1, timeout):
       except OpenSSL.SSL.WantX509LookupError:
         continue
 
       except OpenSSL.SSL.WantX509LookupError:
         continue
 
+      except OpenSSL.SSL.ZeroReturnError, err:
+        # SSL Connection has been closed. In SSL 3.0 and TLS 1.0, this only
+        # occurs if a closure alert has occurred in the protocol, i.e. the
+        # connection has been closed cleanly. Note that this does not
+        # necessarily mean that the transport layer (e.g. a socket) has been
+        # closed.
+        if op == SOCKOP_SEND:
+          # Can happen during a renegotiation
+          raise HttpConnectionClosed(err.args)
+        elif op == SOCKOP_RECV:
+          return ""
+
+        # SSL_shutdown shouldn't return SSL_ERROR_ZERO_RETURN
+        raise socket.error(err.args)
+
       except OpenSSL.SSL.SysCallError, err:
         if op == SOCKOP_SEND:
           # arg1 is the data when writing
       except OpenSSL.SSL.SysCallError, err:
         if op == SOCKOP_SEND:
           # arg1 is the data when writing
@@ -313,9 +465,13 @@ def SocketOperation(poller, sock, op, arg1, timeout):
             # and can be ignored
             return 0
 
             # and can be ignored
             return 0
 
-        elif op == SOCKOP_RECV:
-          if err.args == (-1, _SSL_UNEXPECTED_EOF):
+        if err.args == (-1, _SSL_UNEXPECTED_EOF):
+          if op == SOCKOP_RECV:
             return ""
             return ""
+          elif op == SOCKOP_HANDSHAKE:
+            # Can happen if peer disconnects directly after the connection is
+            # opened.
+            raise HttpSessionHandshakeUnexpectedEOF(err.args)
 
         raise socket.error(err.args)
 
 
         raise socket.error(err.args)
 
@@ -330,6 +486,65 @@ def SocketOperation(poller, sock, op, arg1, timeout):
       raise
 
 
       raise
 
 
+def ShutdownConnection(sock, close_timeout, write_timeout, msgreader, force):
+  """Closes the connection.
+
+  @type sock: socket
+  @param sock: Socket to be shut down
+  @type close_timeout: float
+  @param close_timeout: How long to wait for the peer to close
+      the connection
+  @type write_timeout: float
+  @param write_timeout: Write timeout for shutdown
+  @type msgreader: http.HttpMessageReader
+  @param msgreader: Request message reader, used to determine whether
+      peer should close connection
+  @type force: bool
+  @param force: Whether to forcibly close the connection without
+      waiting for peer
+
+  """
+  #print msgreader.peer_will_close, force
+  if msgreader and msgreader.peer_will_close and not force:
+    # Wait for peer to close
+    try:
+      # Check whether it's actually closed
+      if not SocketOperation(sock, SOCKOP_RECV, 1, close_timeout):
+        return
+    except (socket.error, HttpError, HttpSocketTimeout):
+      # Ignore errors at this stage
+      pass
+
+  # Close the connection from our side
+  try:
+    # We don't care about the return value, see NOTES in SSL_shutdown(3).
+    SocketOperation(sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
+                    write_timeout)
+  except HttpSocketTimeout:
+    raise HttpError("Timeout while shutting down connection")
+  except socket.error, err:
+    # Ignore ENOTCONN
+    if not (err.args and err.args[0] == errno.ENOTCONN):
+      raise HttpError("Error while shutting down connection: %s" % err)
+
+
+def Handshake(sock, write_timeout):
+  """Shakes peer's hands.
+
+  @type sock: socket
+  @param sock: Socket to be shut down
+  @type write_timeout: float
+  @param write_timeout: Write timeout for handshake
+
+  """
+  try:
+    return SocketOperation(sock, SOCKOP_HANDSHAKE, None, write_timeout)
+  except HttpSocketTimeout:
+    raise HttpError("Timeout during SSL handshake")
+  except socket.error, err:
+    raise HttpError("Error in SSL handshake: %s" % err)
+
+
 class HttpSslParams(object):
   """Data class for SSL key and certificate.
 
 class HttpSslParams(object):
   """Data class for SSL key and certificate.
 
@@ -340,7 +555,8 @@ class HttpSslParams(object):
     @type ssl_key_path: string
     @param ssl_key_path: Path to file containing SSL key in PEM format
     @type ssl_cert_path: string
     @type ssl_key_path: string
     @param ssl_key_path: Path to file containing SSL key in PEM format
     @type ssl_cert_path: string
-    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
+    @param ssl_cert_path: Path to file containing SSL certificate
+        in PEM format
 
     """
     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
 
     """
     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
@@ -355,12 +571,12 @@ class HttpSslParams(object):
                                            self.ssl_cert_pem)
 
 
                                            self.ssl_cert_pem)
 
 
-class HttpSocketBase(object):
+class HttpBase(object):
   """Base class for HTTP server and client.
 
   """
   def __init__(self):
   """Base class for HTTP server and client.
 
   """
   def __init__(self):
-    self._using_ssl = None
+    self.using_ssl = None
     self._ssl_params = None
     self._ssl_key = None
     self._ssl_cert = None
     self._ssl_params = None
     self._ssl_key = None
     self._ssl_cert = None
@@ -371,8 +587,8 @@ class HttpSocketBase(object):
     @type ssl_params: HttpSslParams
     @param ssl_params: SSL key and certificate
     @type ssl_verify_peer: bool
     @type ssl_params: HttpSslParams
     @param ssl_params: SSL key and certificate
     @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to require client certificate and compare
-                            it with our certificate
+    @param ssl_verify_peer: Whether to require client certificate
+        and compare it with our certificate
 
     """
     self._ssl_params = ssl_params
 
     """
     self._ssl_params = ssl_params
@@ -380,9 +596,9 @@ class HttpSocketBase(object):
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
     # Should we enable SSL?
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
     # Should we enable SSL?
-    self._using_ssl = ssl_params is not None
+    self.using_ssl = ssl_params is not None
 
 
-    if not self._using_ssl:
+    if not self.using_ssl:
       return sock
 
     self._ssl_key = ssl_params.GetKey()
       return sock
 
     self._ssl_key = ssl_params.GetKey()
@@ -415,697 +631,198 @@ class HttpSocketBase(object):
             self._ssl_cert.digest("md5") == cert.digest("md5"))
 
 
             self._ssl_cert.digest("md5") == cert.digest("md5"))
 
 
-class HttpServerRequestExecutor(object):
-  """Implements server side of HTTP
-
-  This class implements the server side of HTTP. It's based on code of Python's
-  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
-  character encodings. Keep-alive connections are not supported.
+class HttpMessage(object):
+  """Data structure for HTTP message.
 
   """
 
   """
-  # The default request version.  This only affects responses up until
-  # the point where the request line is parsed, so it mainly decides what
-  # the client gets back when sending a malformed request line.
-  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
-  default_request_version = HTTP_0_9
-
-  # Error message settings
-  error_message_format = DEFAULT_ERROR_MESSAGE
-  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
-
-  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
+  def __init__(self):
+    self.start_line = None
+    self.headers = None
+    self.body = None
+    self.decoded_body = None
 
 
-  def __init__(self, server, conn, client_addr, fileio_class):
-    """Initializes this class.
 
 
-    Part of the initialization is reading the request and eventual POST/PUT
-    data sent by the client.
+class HttpClientToServerStartLine(object):
+  """Data structure for HTTP request start line.
 
 
-    """
-    self._server = server
-
-    # We default rfile to buffered because otherwise it could be
-    # really slow for large data (a getc() call per byte); we make
-    # wfile unbuffered because (a) often after a write() we want to
-    # read and we need to flush the line; (b) big writes to unbuffered
-    # files are typically optimized by stdio even when big reads
-    # aren't.
-    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
-    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
-
-    self.client_addr = client_addr
-
-    self.request_headers = None
-    self.request_method = None
-    self.request_path = None
-    self.request_requestline = None
-    self.request_version = self.default_request_version
-
-    self.response_body = None
-    self.response_code = HTTP_OK
-    self.response_content_type = None
-    self.response_headers = {}
-
-    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
-    try:
-      try:
-        try:
-          try:
-            # Read, parse and handle request
-            self._ReadRequest()
-            self._ReadPostData()
-            self._HandleRequest()
-          except HttpException, err:
-            self._SetErrorStatus(err)
-        finally:
-          # Try to send a response
-          self._SendResponse()
-          self._Close()
-      except SocketClosed:
-        pass
-    finally:
-      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
-
-  def _Close(self):
-    if not self.wfile.closed:
-      self.wfile.flush()
-    self.wfile.close()
-    self.rfile.close()
-
-  def _DateTimeHeader(self):
-    """Return the current date and time formatted for a message header.
+  """
+  def __init__(self, method, path, version):
+    self.method = method
+    self.path = path
+    self.version = version
 
 
-    """
-    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
-    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
-            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
+  def __str__(self):
+    return "%s %s %s" % (self.method, self.path, self.version)
 
 
-  def _SetErrorStatus(self, err):
-    """Sets the response code and body from a HttpException.
 
 
-    @type err: HttpException
-    @param err: Exception instance
+class HttpServerToClientStartLine(object):
+  """Data structure for HTTP response start line.
 
 
-    """
-    try:
-      (shortmsg, longmsg) = self.responses[err.code]
-    except KeyError:
-      shortmsg = longmsg = "Unknown"
+  """
+  def __init__(self, version, code, reason):
+    self.version = version
+    self.code = code
+    self.reason = reason
 
 
-    if err.message:
-      message = err.message
-    else:
-      message = shortmsg
+  def __str__(self):
+    return "%s %s %s" % (self.version, self.code, self.reason)
 
 
-    values = {
-      "code": err.code,
-      "message": cgi.escape(message),
-      "explain": longmsg,
-      }
 
 
-    self.response_code = err.code
-    self.response_content_type = self.error_content_type
-    self.response_body = self.error_message_format % values
+class HttpMessageWriter(object):
+  """Writes an HTTP message to a socket.
 
 
-  def _HandleRequest(self):
-    """Handle the actual request.
+  """
+  def __init__(self, sock, msg, write_timeout):
+    """Initializes this class and writes an HTTP message to a socket.
 
 
-    Calls the actual handler function and converts exceptions into HTTP errors.
+    @type sock: socket
+    @param sock: Socket to be written to
+    @type msg: http.HttpMessage
+    @param msg: HTTP message to be written
+    @type write_timeout: float
+    @param write_timeout: Write timeout for socket
 
     """
 
     """
-    # Don't do anything if there's already been a problem
-    if self.response_code != HTTP_OK:
-      return
+    self._msg = msg
 
 
-    assert self.request_method, "Status code %s requires a method" % HTTP_OK
+    self._PrepareMessage()
 
 
-    # Check whether client is still there
-    self.rfile.read(0)
+    buf = self._FormatMessage()
 
 
-    try:
-      try:
-        result = self._server.HandleRequest(self)
+    pos = 0
+    end = len(buf)
+    while pos < end:
+      # Send only SOCK_BUF_SIZE bytes at a time
+      data = buf[pos:(pos + SOCK_BUF_SIZE)]
 
 
-        # TODO: Content-type
-        encoder = HttpJsonConverter()
-        body = encoder.Encode(result)
+      sent = SocketOperation(sock, SOCKOP_SEND, data, write_timeout)
 
 
-        self.response_content_type = encoder.CONTENT_TYPE
-        self.response_body = body
-      except (HttpException, KeyboardInterrupt, SystemExit):
-        raise
-      except Exception, err:
-        logging.exception("Caught exception")
-        raise HttpInternalError(message=str(err))
-      except:
-        logging.exception("Unknown exception")
-        raise HttpInternalError(message="Unknown error")
+      # Remove sent bytes
+      pos += sent
 
 
-    except HttpException, err:
-      self._SetErrorStatus(err)
+    assert pos == end, "Message wasn't sent completely"
 
 
-  def _SendResponse(self):
-    """Sends response to the client.
+  def _PrepareMessage(self):
+    """Prepares the HTTP message by setting mandatory headers.
 
     """
 
     """
-    # Check whether client is still there
-    self.rfile.read(0)
+    # RFC2616, section 4.3: "The presence of a message-body in a request is
+    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    # field in the request's message-headers."
+    if self._msg.body:
+      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
 
 
-    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
-                 self.request_requestline, self.response_code)
-
-    if self.response_code in self.responses:
-      response_message = self.responses[self.response_code][0]
-    else:
-      response_message = ""
-
-    if self.request_version != HTTP_0_9:
-      self.wfile.write("%s %d %s\r\n" %
-                       (self.request_version, self.response_code,
-                        response_message))
-      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
-      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
-      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
-      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
-      for key, val in self.response_headers.iteritems():
-        self._SendHeader(key, val)
-
-      # We don't support keep-alive at this time
-      self._SendHeader(HTTP_CONNECTION, "close")
-      self.wfile.write("\r\n")
-
-    if (self.request_method != HTTP_HEAD and
-        self.response_code >= HTTP_OK and
-        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
-      self.wfile.write(self.response_body)
-
-  def _SendHeader(self, name, value):
-    if self.request_version != HTTP_0_9:
-      self.wfile.write("%s: %s\r\n" % (name, value))
-
-  def _ReadRequest(self):
-    """Reads and parses request line
+  def _FormatMessage(self):
+    """Serializes the HTTP message into a string.
 
     """
 
     """
-    raw_requestline = self.rfile.readline()
-
-    requestline = raw_requestline
-    if requestline[-2:] == '\r\n':
-      requestline = requestline[:-2]
-    elif requestline[-1:] == '\n':
-      requestline = requestline[:-1]
-
-    if not requestline:
-      raise HttpBadRequest("Empty request line")
-
-    self.request_requestline = requestline
-
-    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
+    buf = StringIO()
 
 
-    words = requestline.split()
+    # Add start line
+    buf.write(str(self._msg.start_line))
+    buf.write("\r\n")
 
 
-    if len(words) == 3:
-      [method, path, version] = words
-      if version[:5] != 'HTTP/':
-        raise HttpBadRequest("Bad request version (%r)" % version)
+    # Add headers
+    if self._msg.start_line.version != HTTP_0_9:
+      for name, value in self._msg.headers.iteritems():
+        buf.write("%s: %s\r\n" % (name, value))
 
 
-      try:
-        base_version_number = version.split('/', 1)[1]
-        version_number = base_version_number.split(".")
-
-        # RFC 2145 section 3.1 says there can be only one "." and
-        #   - major and minor numbers MUST be treated as
-        #      separate integers;
-        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
-        #      turn is lower than HTTP/12.3;
-        #   - Leading zeros MUST be ignored by recipients.
-        if len(version_number) != 2:
-          raise HttpBadRequest("Bad request version (%r)" % version)
-
-        version_number = int(version_number[0]), int(version_number[1])
-      except (ValueError, IndexError):
-        raise HttpBadRequest("Bad request version (%r)" % version)
-
-      if version_number >= (2, 0):
-        raise HttpVersionNotSupported("Invalid HTTP Version (%s)" %
-                                      base_version_number)
-
-    elif len(words) == 2:
-      version = HTTP_0_9
-      [method, path] = words
-      if method != HTTP_GET:
-        raise HttpBadRequest("Bad HTTP/0.9 request type (%r)" % method)
+    buf.write("\r\n")
 
 
-    else:
-      raise HttpBadRequest("Bad request syntax (%r)" % requestline)
+    # Add message body if needed
+    if self.HasMessageBody():
+      buf.write(self._msg.body)
 
 
-    # Examine the headers and look for a Connection directive
-    headers = mimetools.Message(self.rfile, 0)
+    elif self._msg.body:
+      logging.warning("Ignoring message body")
 
 
-    self.request_method = method
-    self.request_path = path
-    self.request_version = version
-    self.request_headers = headers
+    return buf.getvalue()
 
 
-  def _ReadPostData(self):
-    """Reads POST/PUT data
+  def HasMessageBody(self):
+    """Checks whether the HTTP message contains a body.
 
 
-    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
-    a request is signaled by the inclusion of a Content-Length header field in
-    the request message headers. HTTP/1.0 requests containing an entity body
-    must include a valid Content-Length header field."
+    Can be overridden by subclasses.
 
     """
 
     """
-    # While not according to specification, we only support an entity body for
-    # POST and PUT.
-    if (not self.request_method or
-        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
-      self.request_post_data = None
-      return
-
-    content_length = None
-    try:
-      if HTTP_CONTENT_LENGTH in self.request_headers:
-        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
-    except TypeError:
-      pass
-    except ValueError:
-      pass
-
-    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
-    if content_length is None:
-      raise HttpLengthRequired("Missing Content-Length header or"
-                               " invalid format")
-
-    data = self.rfile.read(content_length)
-
-    # TODO: Content-type, error handling
-    if data:
-      self.request_post_data = HttpJsonConverter().Decode(data)
-    else:
-      self.request_post_data = None
-
-    logging.debug("HTTP POST data: %s", self.request_post_data)
+    return bool(self._msg.body)
 
 
 
 
-class HttpServer(HttpSocketBase):
-  """Generic HTTP server class
-
-  Users of this class must subclass it and override the HandleRequest function.
+class HttpMessageReader(object):
+  """Reads HTTP message from socket.
 
   """
 
   """
-  MAX_CHILDREN = 20
-
-  def __init__(self, mainloop, local_address, port,
-               ssl_params=None, ssl_verify_peer=False):
-    """Initializes the HTTP server
-
-    @type mainloop: ganeti.daemon.Mainloop
-    @param mainloop: Mainloop used to poll for I/O events
-    @type local_addess: string
-    @param local_address: Local IP address to bind to
-    @type port: int
-    @param port: TCP port to listen on
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to require client certificate and compare
-                            it with our certificate
-
-    """
-    HttpSocketBase.__init__(self)
-
-    self.mainloop = mainloop
-    self.local_address = local_address
-    self.port = port
-
-    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
-
-    # Allow port to be reused
-    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    if self._using_ssl:
-      self._fileio_class = _SSLFileObject
-    else:
-      self._fileio_class = socket._fileobject
-
-    self._children = []
-
-    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
-    mainloop.RegisterSignal(self)
-
-  def Start(self):
-    self.socket.bind((self.local_address, self.port))
-    self.socket.listen(5)
-
-  def Stop(self):
-    self.socket.close()
-
-  def OnIO(self, fd, condition):
-    if condition & select.POLLIN:
-      self._IncomingConnection()
-
-  def OnSignal(self, signum):
-    if signum == signal.SIGCHLD:
-      self._CollectChildren(True)
-
-  def _CollectChildren(self, quick):
-    """Checks whether any child processes are done
-
-    @type quick: bool
-    @param quick: Whether to only use non-blocking functions
-
-    """
-    if not quick:
-      # Don't wait for other processes if it should be a quick check
-      while len(self._children) > self.MAX_CHILDREN:
-        try:
-          # Waiting without a timeout brings us into a potential DoS situation.
-          # As soon as too many children run, we'll not respond to new
-          # requests. The real solution would be to add a timeout for children
-          # and killing them after some time.
-          pid, status = os.waitpid(0, 0)
-        except os.error:
-          pid = None
-        if pid and pid in self._children:
-          self._children.remove(pid)
-
-    for child in self._children:
-      try:
-        pid, status = os.waitpid(child, os.WNOHANG)
-      except os.error:
-        pid = None
-      if pid and pid in self._children:
-        self._children.remove(pid)
-
-  def _IncomingConnection(self):
-    """Called for each incoming connection
-
-    """
-    (connection, client_addr) = self.socket.accept()
-
-    self._CollectChildren(False)
-
-    pid = os.fork()
-    if pid == 0:
-      # Child process
-      try:
-        HttpServerRequestExecutor(self, connection, client_addr,
-                                  self._fileio_class)
-      except:
-        logging.exception("Error while handling request from %s:%s",
-                          client_addr[0], client_addr[1])
-        os._exit(1)
-      os._exit(0)
-    else:
-      self._children.append(pid)
-
-  def HandleRequest(self, req):
-    raise NotImplementedError()
-
-
-class HttpClientRequest(object):
-  def __init__(self, host, port, method, path, headers=None, post_data=None,
-               ssl_params=None, ssl_verify_peer=False):
-    """Describes an HTTP request.
-
-    @type host: string
-    @param host: Hostname
-    @type port: int
-    @param port: Port
-    @type method: string
-    @param method: Method name
-    @type path: string
-    @param path: Request path
-    @type headers: dict or None
-    @param headers: Additional headers to send
-    @type post_data: string or None
-    @param post_data: Additional data to send
-    @type ssl_params: HttpSslParams
-    @param ssl_params: SSL key and certificate
-    @type ssl_verify_peer: bool
-    @param ssl_verify_peer: Whether to compare our certificate with server's
-                            certificate
-
-    """
-    if post_data is not None:
-      assert method.upper() in (HTTP_POST, HTTP_PUT), \
-        "Only POST and GET requests support sending data"
-
-    assert path.startswith("/"), "Path must start with slash (/)"
-
-    self.host = host
-    self.port = port
-    self.ssl_params = ssl_params
-    self.ssl_verify_peer = ssl_verify_peer
-    self.method = method
-    self.path = path
-    self.headers = headers
-    self.post_data = post_data
-
-    self.success = None
-    self.error = None
-
-    self.resp_status_line = None
-    self.resp_version = None
-    self.resp_status = None
-    self.resp_reason = None
-    self.resp_headers = None
-    self.resp_body = None
-
-
-class HttpClientRequestExecutor(HttpSocketBase):
-  # Default headers
-  DEFAULT_HEADERS = {
-    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
-    # TODO: For keep-alive, don't send "Connection: close"
-    HTTP_CONNECTION: "close",
-    }
-
   # Length limits
   # Length limits
-  STATUS_LINE_LENGTH_MAX = 512
-  HEADER_LENGTH_MAX = 4 * 1024
-
-  # Timeouts in seconds for socket layer
-  # TODO: Make read timeout configurable per OpCode
-  CONNECT_TIMEOUT = 5.0
-  WRITE_TIMEOUT = 10
-  READ_TIMEOUT = None
-  CLOSE_TIMEOUT = 1
+  START_LINE_LENGTH_MAX = None
+  HEADER_LENGTH_MAX = None
 
   # Parser state machine
 
   # Parser state machine
-  PS_STATUS_LINE = "status-line"
+  PS_START_LINE = "start-line"
   PS_HEADERS = "headers"
   PS_HEADERS = "headers"
-  PS_BODY = "body"
+  PS_BODY = "entity-body"
   PS_COMPLETE = "complete"
 
   PS_COMPLETE = "complete"
 
-  def __init__(self, req):
-    """Initializes the HttpClientRequestExecutor class.
+  def __init__(self, sock, msg, read_timeout):
+    """Reads an HTTP message from a socket.
 
 
-    @type req: HttpClientRequest
-    @param req: Request object
+    @type sock: socket
+    @param sock: Socket to be read from
+    @type msg: http.HttpMessage
+    @param msg: Object for the read message
+    @type read_timeout: float
+    @param read_timeout: Read timeout for socket
 
     """
 
     """
-    HttpSocketBase.__init__(self)
-
-    self.request = req
+    self.sock = sock
+    self.msg = msg
 
 
-    self.parser_status = self.PS_STATUS_LINE
+    self.start_line_buffer = None
     self.header_buffer = StringIO()
     self.body_buffer = StringIO()
     self.header_buffer = StringIO()
     self.body_buffer = StringIO()
+    self.parser_status = self.PS_START_LINE
     self.content_length = None
     self.content_length = None
-    self.server_will_close = None
-
-    self.poller = select.poll()
-
-    try:
-      # TODO: Implement connection caching/keep-alive
-      self.sock = self._CreateSocket(req.ssl_params,
-                                     req.ssl_verify_peer)
-
-      # Disable Python's timeout
-      self.sock.settimeout(None)
-
-      # Operate in non-blocking mode
-      self.sock.setblocking(0)
-
-      force_close = True
-      self._Connect()
-      try:
-        self._SendRequest()
-        self._ReadResponse()
-
-        # Only wait for server to close if we didn't have any exception.
-        force_close = False
-      finally:
-        self._CloseConnection(force_close)
-
-      self.sock.close()
-      self.sock = None
-
-      req.resp_body = self.body_buffer.getvalue()
-
-      req.success = True
-      req.error = None
-
-    except _HttpClientError, err:
-      req.success = False
-      req.error = str(err)
-
-  def _BuildRequest(self):
-    """Build HTTP request.
-
-    @rtype: string
-    @return: Complete request
-
-    """
-    # Headers
-    send_headers = self.DEFAULT_HEADERS.copy()
-
-    if self.request.headers:
-      send_headers.update(self.request.headers)
-
-    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
-
-    if self.request.post_data:
-      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
+    self.peer_will_close = None
 
 
-    buf = StringIO()
-
-    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
-    # keep-alive).
-    # TODO: For keep-alive, change to HTTP/1.1
-    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
-                                self.request.path, HTTP_1_0))
-
-    # Add headers
-    for name, value in send_headers.iteritems():
-      buf.write("%s: %s\r\n" % (name, value))
-
-    buf.write("\r\n")
-
-    if self.request.post_data:
-      buf.write(self.request.post_data)
-
-    return buf.getvalue()
-
-  def _ParseStatusLine(self):
-    """Parses the status line sent by the server.
-
-    """
-    line = self.request.resp_status_line
-
-    if not line:
-      raise _HttpClientError("Empty status line")
-
-    try:
-      [version, status, reason] = line.split(None, 2)
-    except ValueError:
-      try:
-        [version, status] = line.split(None, 1)
-        reason = ""
-      except ValueError:
-        version = HTTP_9_0
-
-    if version:
-      version = version.upper()
-
-    if version not in (HTTP_1_0, HTTP_1_1):
-      # We do not support HTTP/0.9, despite the specification requiring it
-      # (RFC2616, section 19.6)
-      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
-                             line)
-
-    # The status code is a three-digit number
-    try:
-      status = int(status)
-      if status < 100 or status > 999:
-        status = -1
-    except ValueError:
-      status = -1
-
-    if status == -1:
-      raise _HttpClientError("Invalid status code (%r)" % line)
-
-    self.request.resp_version = version
-    self.request.resp_status = status
-    self.request.resp_reason = reason
-
-  def _WillServerCloseConnection(self):
-    """Evaluate whether server will close the connection.
-
-    @rtype: bool
-    @return: Whether server will close the connection
-
-    """
-    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
-    if hdr_connection:
-      hdr_connection = hdr_connection.lower()
-
-    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
-    if self.request.resp_version == HTTP_1_1:
-      return (hdr_connection and "close" in hdr_connection)
-
-    # Some HTTP/1.0 implementations have support for persistent connections,
-    # using rules different than HTTP/1.1.
-
-    # For older HTTP, Keep-Alive indicates persistent connection.
-    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
-      return False
-
-    # At least Akamai returns a "Connection: Keep-Alive" header, which was
-    # supposed to be sent by the client.
-    if hdr_connection and "keep-alive" in hdr_connection:
-      return False
-
-    return True
-
-  def _ParseHeaders(self):
-    """Parses the headers sent by the server.
-
-    This function also adjusts internal variables based on the header values.
+    buf = ""
+    eof = False
+    while self.parser_status != self.PS_COMPLETE:
+      # TODO: Don't read more than necessary (Content-Length), otherwise
+      # data might be lost and/or an error could occur
+      data = SocketOperation(sock, SOCKOP_RECV, SOCK_BUF_SIZE, read_timeout)
 
 
-    """
-    req = self.request
+      if data:
+        buf += data
+      else:
+        eof = True
 
 
-    # Parse headers
-    self.header_buffer.seek(0, 0)
-    req.resp_headers = mimetools.Message(self.header_buffer, 0)
+      # Do some parsing and error checking while more data arrives
+      buf = self._ContinueParsing(buf, eof)
 
 
-    self.server_will_close = self._WillServerCloseConnection()
+      # Must be done only after the buffer has been evaluated
+      # TODO: Connection-length < len(data read) and connection closed
+      if (eof and
+          self.parser_status in (self.PS_START_LINE,
+                                 self.PS_HEADERS)):
+        raise HttpError("Connection closed prematurely")
 
 
-    # Do we have a Content-Length header?
-    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
-    if hdr_content_length:
-      try:
-        self.content_length = int(hdr_content_length)
-      except ValueError:
-        pass
-      if self.content_length is not None and self.content_length < 0:
-        self.content_length = None
+    # Parse rest
+    buf = self._ContinueParsing(buf, True)
 
 
-    # does the body have a fixed length? (of zero)
-    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
-        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
-      self.content_length = 0
+    assert self.parser_status == self.PS_COMPLETE
+    assert not buf, "Parser didn't read full response"
 
 
-    # if the connection remains open and a content-length was not provided,
-    # then assume that the connection WILL close.
-    if self.content_length is None:
-      self.server_will_close = True
+    msg.body = self.body_buffer.getvalue()
 
 
-  def _CheckStatusLineLength(self, length):
-    if length > self.STATUS_LINE_LENGTH_MAX:
-      raise _HttpClientError("Status line longer than %d chars" %
-                             self.STATUS_LINE_LENGTH_MAX)
+    # TODO: Content-type, error handling
+    if msg.body:
+      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
+    else:
+      msg.decoded_body = None
 
 
-  def _CheckHeaderLength(self, length):
-    if length > self.HEADER_LENGTH_MAX:
-      raise _HttpClientError("Headers longer than %d chars" %
-                             self.HEADER_LENGTH_MAX)
+    if msg.decoded_body:
+      logging.debug("Message body: %s", msg.decoded_body)
 
 
-  def _ParseBuffer(self, buf, eof):
-    """Main function for HTTP response state machine.
+  def _ContinueParsing(self, buf, eof):
+    """Main function for HTTP message state machine.
 
     @type buf: string
     @param buf: Receive buffer
 
     @type buf: string
     @param buf: Receive buffer
@@ -1115,25 +832,41 @@ class HttpClientRequestExecutor(HttpSocketBase):
     @return: Updated receive buffer
 
     """
     @return: Updated receive buffer
 
     """
-    if self.parser_status == self.PS_STATUS_LINE:
-      # Expect status line
-      idx = buf.find("\r\n")
-      if idx >= 0:
-        self.request.resp_status_line = buf[:idx]
-
-        self._CheckStatusLineLength(len(self.request.resp_status_line))
+    # TODO: Use offset instead of slicing when possible
+    if self.parser_status == self.PS_START_LINE:
+      # Expect start line
+      while True:
+        idx = buf.find("\r\n")
+
+        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
+        # ignore any empty line(s) received where a Request-Line is expected.
+        # In other words, if the server is reading the protocol stream at the
+        # beginning of a message and receives a CRLF first, it should ignore
+        # the CRLF."
+        if idx == 0:
+          # TODO: Limit number of CRLFs/empty lines for safety?
+          buf = buf[:2]
+          continue
+
+        if idx > 0:
+          self.start_line_buffer = buf[:idx]
+
+          self._CheckStartLineLength(len(self.start_line_buffer))
+
+          # Remove status line, including CRLF
+          buf = buf[idx + 2:]
+
+          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
+
+          self.parser_status = self.PS_HEADERS
+        else:
+          # Check whether incoming data is getting too large, otherwise we just
+          # fill our read buffer.
+          self._CheckStartLineLength(len(buf))
 
 
-        # Remove status line, including CRLF
-        buf = buf[idx + 2:]
-
-        self._ParseStatusLine()
-
-        self.parser_status = self.PS_HEADERS
-      else:
-        # Check whether incoming data is getting too large, otherwise we just
-        # fill our read buffer.
-        self._CheckStatusLineLength(len(buf))
+        break
 
 
+    # TODO: Handle messages without headers
     if self.parser_status == self.PS_HEADERS:
       # Wait for header end
       idx = buf.find("\r\n\r\n")
     if self.parser_status == self.PS_HEADERS:
       # Wait for header end
       idx = buf.find("\r\n\r\n")
@@ -1154,324 +887,127 @@ class HttpClientRequestExecutor(HttpSocketBase):
         self._CheckHeaderLength(len(buf))
 
     if self.parser_status == self.PS_BODY:
         self._CheckHeaderLength(len(buf))
 
     if self.parser_status == self.PS_BODY:
+      # TODO: Implement max size for body_buffer
       self.body_buffer.write(buf)
       buf = ""
 
       # Check whether we've read everything
       self.body_buffer.write(buf)
       buf = ""
 
       # Check whether we've read everything
+      #
+      # RFC2616, section 4.4: "When a message-body is included with a message,
+      # the transfer-length of that body is determined by one of the following
+      # [...] 5. By the server closing the connection. (Closing the connection
+      # cannot be used to indicate the end of a request body, since that would
+      # leave no possibility for the server to send back a response.)"
+      #
+      # TODO: Error when buffer length > Content-Length header
       if (eof or
       if (eof or
+          self.content_length is None or
           (self.content_length is not None and
            self.body_buffer.tell() >= self.content_length)):
         self.parser_status = self.PS_COMPLETE
 
     return buf
 
           (self.content_length is not None and
            self.body_buffer.tell() >= self.content_length)):
         self.parser_status = self.PS_COMPLETE
 
     return buf
 
-  def _Connect(self):
-    """Non-blocking connect to host with timeout.
-
-    """
-    connected = False
-    while True:
-      try:
-        connect_error = self.sock.connect_ex((self.request.host,
-                                              self.request.port))
-      except socket.gaierror, err:
-        raise _HttpClientError("Connection failed: %s" % str(err))
-
-      if connect_error == errno.EINTR:
-        # Mask signals
-        pass
-
-      elif connect_error == 0:
-        # Connection established
-        connected = True
-        break
+  def _CheckStartLineLength(self, length):
+    """Limits the start line buffer size.
 
 
-      elif connect_error == errno.EINPROGRESS:
-        # Connection started
-        break
-
-      raise _HttpClientError("Connection failed (%s: %s)" %
-                             (connect_error, os.strerror(connect_error)))
-
-    if not connected:
-      # Wait for connection
-      event = WaitForSocketCondition(self.poller, self.sock,
-                                     select.POLLOUT, self.CONNECT_TIMEOUT)
-      if event is None:
-        raise _HttpClientError("Timeout while connecting to server")
-
-      # Get error code
-      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-      if connect_error != 0:
-        raise _HttpClientError("Connection failed (%s: %s)" %
-                               (connect_error, os.strerror(connect_error)))
-
-    # Enable TCP keep-alive
-    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-
-    # If needed, Linux specific options are available to change the TCP
-    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
-    # TCP_KEEPINTVL.
-
-  def _SendRequest(self):
-    """Sends request to server.
+    @type length: int
+    @param length: Buffer size
 
     """
 
     """
-    buf = self._BuildRequest()
-
-    while buf:
-      # Send only 4 KB at a time
-      data = buf[:4096]
+    if (self.START_LINE_LENGTH_MAX is not None and
+        length > self.START_LINE_LENGTH_MAX):
+      raise HttpError("Start line longer than %d chars" %
+                       self.START_LINE_LENGTH_MAX)
 
 
-      try:
-        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
-                               self.WRITE_TIMEOUT)
-      except HttpSocketTimeout:
-        raise _HttpClientError("Timeout while sending request")
-      except socket.error, err:
-        raise _HttpClientError("Error sending request: %s" % err)
-
-      # Remove sent bytes
-      buf = buf[sent:]
-
-    assert not buf, "Request wasn't sent completely"
-
-  def _ReadResponse(self):
-    """Read response from server.
+  def _CheckHeaderLength(self, length):
+    """Limits the header buffer size.
 
 
-    Calls the parser function after reading a chunk of data.
+    @type length: int
+    @param length: Buffer size
 
     """
 
     """
-    buf = ""
-    eof = False
-    while self.parser_status != self.PS_COMPLETE:
-      try:
-        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
-                               self.READ_TIMEOUT)
-      except HttpSocketTimeout:
-        raise _HttpClientError("Timeout while reading response")
-      except socket.error, err:
-        raise _HttpClientError("Error while reading response: %s" % err)
-
-      if data:
-        buf += data
-      else:
-        eof = True
+    if (self.HEADER_LENGTH_MAX is not None and
+        length > self.HEADER_LENGTH_MAX):
+      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
 
 
-      # Do some parsing and error checking while more data arrives
-      buf = self._ParseBuffer(buf, eof)
+  def ParseStartLine(self, start_line):
+    """Parses the start line of a message.
 
 
-      # Must be done only after the buffer has been evaluated
-      if (eof and
-          self.parser_status in (self.PS_STATUS_LINE,
-                                 self.PS_HEADERS)):
-        raise _HttpClientError("Connection closed prematurely")
+    Must be overridden by subclass.
 
 
-    # Parse rest
-    buf = self._ParseBuffer(buf, True)
-
-    assert self.parser_status == self.PS_COMPLETE
-    assert not buf, "Parser didn't read full response"
-
-  def _CloseConnection(self, force):
-    """Closes the connection.
+    @type start_line: string
+    @param start_line: Start line string
 
     """
 
     """
-    if self.server_will_close and not force:
-      # Wait for server to close
-      try:
-        # Check whether it's actually closed
-        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
-                               self.CLOSE_TIMEOUT):
-          return
-      except (socket.error, _HttpClientError, HttpSocketTimeout):
-        # Ignore errors at this stage
-        pass
-
-    # Close the connection from our side
-    try:
-      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
-                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
-    except HttpSocketTimeout:
-      raise _HttpClientError("Timeout while shutting down connection")
-    except socket.error, err:
-      raise _HttpClientError("Error while shutting down connection: %s" % err)
-
-
-class _HttpClientPendingRequest(object):
-  """Data class for pending requests.
-
-  """
-  def __init__(self, request):
-    self.request = request
-
-    # Thread synchronization
-    self.done = threading.Event()
-
-
-class HttpClientWorker(workerpool.BaseWorker):
-  """HTTP client worker class.
-
-  """
-  def RunTask(self, pend_req):
-    try:
-      HttpClientRequestExecutor(pend_req.request)
-    finally:
-      pend_req.done.set()
-
-
-class HttpClientWorkerPool(workerpool.WorkerPool):
-  def __init__(self, manager):
-    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
-                                   HttpClientWorker)
-    self.manager = manager
-
-
-class HttpClientManager(object):
-  """Manages HTTP requests.
-
-  """
-  def __init__(self):
-    self._wpool = HttpClientWorkerPool(self)
-
-  def __del__(self):
-    self.Shutdown()
-
-  def ExecRequests(self, requests):
-    """Execute HTTP requests.
+    raise NotImplementedError()
 
 
-    This function can be called from multiple threads at the same time.
+  def _WillPeerCloseConnection(self):
+    """Evaluate whether peer will close the connection.
 
 
-    @type requests: List of HttpClientRequest instances
-    @param requests: The requests to execute
-    @rtype: List of HttpClientRequest instances
-    @returns: The list of requests passed in
+    @rtype: bool
+    @return: Whether peer will close the connection
 
     """
 
     """
-    # _HttpClientPendingRequest is used for internal thread synchronization
-    pending = [_HttpClientPendingRequest(req) for req in requests]
-
-    try:
-      # Add requests to queue
-      for pend_req in pending:
-        self._wpool.AddTask(pend_req)
-
-    finally:
-      # In case of an exception we should still wait for the rest, otherwise
-      # another thread from the worker pool could modify the request object
-      # after we returned.
-
-      # And wait for them to finish
-      for pend_req in pending:
-        pend_req.done.wait()
-
-    # Return original list
-    return requests
-
-  def Shutdown(self):
-    self._wpool.Quiesce()
-    self._wpool.TerminateWorkers()
-
-
-class _SSLFileObject(object):
-  """Wrapper around socket._fileobject
-
-  This wrapper is required to handle OpenSSL exceptions.
-
-  """
-  def _RequireOpenSocket(fn):
-    def wrapper(self, *args, **kwargs):
-      if self.closed:
-        raise SocketClosed("Socket is closed")
-      return fn(self, *args, **kwargs)
-    return wrapper
-
-  def __init__(self, sock, mode='rb', bufsize=-1):
-    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
-
-  def _ConnectionLost(self):
-    self._base = None
-
-  def _getclosed(self):
-    return self._base is None or self._base.closed
-  closed = property(_getclosed, doc="True if the file is closed")
-
-  @_RequireOpenSocket
-  def close(self):
-    return self._base.close()
-
-  @_RequireOpenSocket
-  def flush(self):
-    return self._base.flush()
-
-  @_RequireOpenSocket
-  def fileno(self):
-    return self._base.fileno()
+    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
+    # for the sender to signal that the connection will be closed after
+    # completion of the response. For example,
+    #
+    #        Connection: close
+    #
+    # in either the request or the response header fields indicates that the
+    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
+    # current request/response is complete."
+
+    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
+    if hdr_connection:
+      hdr_connection = hdr_connection.lower()
 
 
-  @_RequireOpenSocket
-  def read(self, size=-1):
-    return self._ReadWrapper(self._base.read, size=size)
+    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
+    if self.msg.start_line.version == HTTP_1_1:
+      return (hdr_connection and "close" in hdr_connection)
 
 
-  @_RequireOpenSocket
-  def readline(self, size=-1):
-    return self._ReadWrapper(self._base.readline, size=size)
+    # Some HTTP/1.0 implementations have support for persistent connections,
+    # using rules different than HTTP/1.1.
 
 
-  def _ReadWrapper(self, fn, *args, **kwargs):
-    while True:
-      try:
-        return fn(*args, **kwargs)
+    # For older HTTP, Keep-Alive indicates persistent connection.
+    if self.msg.headers.get(HTTP_KEEP_ALIVE):
+      return False
 
 
-      except OpenSSL.SSL.ZeroReturnError, err:
-        self._ConnectionLost()
-        return ""
+    # At least Akamai returns a "Connection: Keep-Alive" header, which was
+    # supposed to be sent by the client.
+    if hdr_connection and "keep-alive" in hdr_connection:
+      return False
 
 
-      except OpenSSL.SSL.WantReadError:
-        continue
+    return True
 
 
-      #except OpenSSL.SSL.WantWriteError:
-      # TODO
+  def _ParseHeaders(self):
+    """Parses the headers.
 
 
-      except OpenSSL.SSL.SysCallError, (retval, desc):
-        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
-            or retval > 0):
-          self._ConnectionLost()
-          return ""
+    This function also adjusts internal variables based on header values.
 
 
-        logging.exception("Error in OpenSSL")
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    RFC2616, section 4.3: The presence of a message-body in a request is
+    signaled by the inclusion of a Content-Length or Transfer-Encoding header
+    field in the request's message-headers.
 
 
-      except OpenSSL.SSL.Error, err:
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    """
+    # Parse headers
+    self.header_buffer.seek(0, 0)
+    self.msg.headers = mimetools.Message(self.header_buffer, 0)
 
 
-  @_RequireOpenSocket
-  def write(self, data):
-    return self._WriteWrapper(self._base.write, data)
+    self.peer_will_close = self._WillPeerCloseConnection()
 
 
-  def _WriteWrapper(self, fn, *args, **kwargs):
-    while True:
+    # Do we have a Content-Length header?
+    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
+    if hdr_content_length:
       try:
       try:
-        return fn(*args, **kwargs)
-      except OpenSSL.SSL.ZeroReturnError, err:
-        self._ConnectionLost()
-        return 0
-
-      except OpenSSL.SSL.WantWriteError:
-        continue
-
-      #except OpenSSL.SSL.WantReadError:
-      # TODO
-
-      except OpenSSL.SSL.SysCallError, err:
-        if err.args[0] == -1 and data == "":
-          # errors when writing empty strings are expected
-          # and can be ignored
-          return 0
-
-        self._ConnectionLost()
-        raise socket.error(err.args)
+        self.content_length = int(hdr_content_length)
+      except ValueError:
+        self.content_length = None
+      if self.content_length is not None and self.content_length < 0:
+        self.content_length = None
 
 
-      except OpenSSL.SSL.Error, err:
-        self._ConnectionLost()
-        raise socket.error(err.args)
+    # if the connection remains open and a content-length was not provided,
+    # then assume that the connection WILL close.
+    if self.content_length is None:
+      self.peer_will_close = True