From 02cab3e7536e27274df63de4718ab39aa8ca7de8 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Thu, 4 Dec 2008 15:23:38 +0000 Subject: [PATCH] ganeti.http: Split HTTP server and client into separate files This includes a large rewrite of the HTTP server code. The handling of OpenSSL errors had some problems that were hard to fix with its structure. When preparing all of this, I realized that actually HTTP is a message protocol and that the same code can be used on both the server and client side to parse requests/responses, with only a few differences. There are still a few TODOs in the code, but none should be a show stopper. Many pylint warnings have been fixed, too. The old code will be removed once all users have been migrated. Reviewed-by: amishchenko --- Makefile.am | 4 +- lib/http/__init__.py | 399 ++++++++++++++++++++++++++++++++++++++++- lib/http/client.py | 388 ++++++++++++++++++++++++++++++++++++++++ lib/http/server.py | 484 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1271 insertions(+), 4 deletions(-) create mode 100644 lib/http/client.py create mode 100644 lib/http/server.py diff --git a/Makefile.am b/Makefile.am index 9483ed5..e427760 100644 --- a/Makefile.am +++ b/Makefile.am @@ -101,7 +101,9 @@ rapi_PYTHON = \ lib/rapi/rlib2.py http_PYTHON = \ - lib/http/__init__.py + lib/http/__init__.py \ + lib/http/client.py \ + lib/http/server.py docsgml = \ diff --git a/lib/http/__init__.py b/lib/http/__init__.py index b4f8c3b..6eb1eb0 100644 --- a/lib/http/__init__.py +++ b/lib/http/__init__.py @@ -18,7 +18,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. -"""HTTP server module. +"""HTTP module. """ @@ -30,10 +30,8 @@ import OpenSSL import os import select import socket -import sys import time import signal -import logging import errno import threading @@ -103,6 +101,14 @@ class SocketClosed(socket.error): pass +class HttpError(Exception): + """Internal exception for HTTP errors. + + This should only be used for internal error reporting. + + """ + + class _HttpClientError(Exception): """Internal exception for HTTP client errors. @@ -330,6 +336,34 @@ def SocketOperation(poller, sock, op, arg1, timeout): raise +def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader, + force): + """Closes the connection. + + """ + poller = select.poll() + + #print msgreader.peer_will_close, force + if msgreader and msgreader.peer_will_close and not force: + # Wait for peer to close + try: + # Check whether it's actually closed + if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout): + return + except (socket.error, HttpError, HttpSocketTimeout): + # Ignore errors at this stage + pass + + # Close the connection from our side + try: + SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR, + write_timeout) + except HttpSocketTimeout: + raise HttpError("Timeout while shutting down connection") + except socket.error, err: + raise HttpError("Error while shutting down connection: %s" % err) + + class HttpSslParams(object): """Data class for SSL key and certificate. @@ -1475,3 +1509,362 @@ class _SSLFileObject(object): except OpenSSL.SSL.Error, err: self._ConnectionLost() raise socket.error(err.args) + + +class HttpMessage(object): + """Data structure for HTTP message. + + """ + def __init__(self): + self.start_line = None + self.headers = None + self.body = None + self.decoded_body = None + + +class HttpClientToServerStartLine(object): + """Data structure for HTTP request start line. + + """ + def __init__(self, method, path, version): + self.method = method + self.path = path + self.version = version + + def __str__(self): + return "%s %s %s" % (self.method, self.path, self.version) + + +class HttpServerToClientStartLine(object): + """Data structure for HTTP response start line. + + """ + def __init__(self, version, code, reason): + self.version = version + self.code = code + self.reason = reason + + def __str__(self): + return "%s %s %s" % (self.version, self.code, self.reason) + + +class HttpMessageWriter(object): + """Writes an HTTP message to a socket. + + """ + def __init__(self, sock, msg, write_timeout): + self._msg = msg + + self._PrepareMessage() + + buf = self._FormatMessage() + + poller = select.poll() + while buf: + # Send only 4 KB at a time + data = buf[:4096] + + sent = SocketOperation(poller, sock, SOCKOP_SEND, data, + write_timeout) + + # Remove sent bytes + buf = buf[sent:] + + assert not buf, "Message wasn't sent completely" + + def _PrepareMessage(self): + """Prepares the HTTP message by setting mandatory headers. + + """ + # RFC2616, section 4.3: "The presence of a message-body in a request is + # signaled by the inclusion of a Content-Length or Transfer-Encoding header + # field in the request's message-headers." + if self._msg.body: + self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body) + + def _FormatMessage(self): + """Serializes the HTTP message into a string. + + """ + buf = StringIO() + + # Add start line + buf.write(str(self._msg.start_line)) + buf.write("\r\n") + + # Add headers + if self._msg.start_line.version != HTTP_0_9: + for name, value in self._msg.headers.iteritems(): + buf.write("%s: %s\r\n" % (name, value)) + + buf.write("\r\n") + + # Add message body if needed + if self.HasMessageBody(): + buf.write(self._msg.body) + + elif self._msg.body: + logging.warning("Ignoring message body") + + return buf.getvalue() + + def HasMessageBody(self): + """Checks whether the HTTP message contains a body. + + Can be overriden by subclasses. + + """ + return bool(self._msg.body) + + +class HttpMessageReader(object): + """Reads HTTP message from socket. + + """ + # Length limits + START_LINE_LENGTH_MAX = None + HEADER_LENGTH_MAX = None + + # Parser state machine + PS_START_LINE = "start-line" + PS_HEADERS = "headers" + PS_BODY = "entity-body" + PS_COMPLETE = "complete" + + def __init__(self, sock, msg, read_timeout): + self.sock = sock + self.msg = msg + + self.poller = select.poll() + self.start_line_buffer = None + self.header_buffer = StringIO() + self.body_buffer = StringIO() + self.parser_status = self.PS_START_LINE + self.content_length = None + self.peer_will_close = None + + buf = "" + eof = False + while self.parser_status != self.PS_COMPLETE: + data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096, + read_timeout) + + if data: + buf += data + else: + eof = True + + # Do some parsing and error checking while more data arrives + buf = self._ContinueParsing(buf, eof) + + # Must be done only after the buffer has been evaluated + # TODO: Connection-length < len(data read) and connection closed + if (eof and + self.parser_status in (self.PS_START_LINE, + self.PS_HEADERS)): + raise HttpError("Connection closed prematurely") + + # Parse rest + buf = self._ContinueParsing(buf, True) + + assert self.parser_status == self.PS_COMPLETE + assert not buf, "Parser didn't read full response" + + msg.body = self.body_buffer.getvalue() + + # TODO: Content-type, error handling + if msg.body: + msg.decoded_body = HttpJsonConverter().Decode(msg.body) + else: + msg.decoded_body = None + + if msg.decoded_body: + logging.debug("Message body: %s", msg.decoded_body) + + def _ContinueParsing(self, buf, eof): + """Main function for HTTP message state machine. + + @type buf: string + @param buf: Receive buffer + @type eof: bool + @param eof: Whether we've reached EOF on the socket + @rtype: string + @return: Updated receive buffer + + """ + if self.parser_status == self.PS_START_LINE: + # Expect start line + while True: + idx = buf.find("\r\n") + + # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD + # ignore any empty line(s) received where a Request-Line is expected. + # In other words, if the server is reading the protocol stream at the + # beginning of a message and receives a CRLF first, it should ignore + # the CRLF." + if idx == 0: + # TODO: Limit number of CRLFs for safety? + buf = buf[:2] + continue + + if idx > 0: + self.start_line_buffer = buf[:idx] + + self._CheckStartLineLength(len(self.start_line_buffer)) + + # Remove status line, including CRLF + buf = buf[idx + 2:] + + self.msg.start_line = self.ParseStartLine(self.start_line_buffer) + + self.parser_status = self.PS_HEADERS + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckStartLineLength(len(buf)) + + break + + # TODO: Handle messages without headers + if self.parser_status == self.PS_HEADERS: + # Wait for header end + idx = buf.find("\r\n\r\n") + if idx >= 0: + self.header_buffer.write(buf[:idx + 2]) + + self._CheckHeaderLength(self.header_buffer.tell()) + + # Remove headers, including CRLF + buf = buf[idx + 4:] + + self._ParseHeaders() + + self.parser_status = self.PS_BODY + else: + # Check whether incoming data is getting too large, otherwise we just + # fill our read buffer. + self._CheckHeaderLength(len(buf)) + + if self.parser_status == self.PS_BODY: + # TODO: Implement max size for body_buffer + self.body_buffer.write(buf) + buf = "" + + # Check whether we've read everything + # + # RFC2616, section 4.4: "When a message-body is included with a message, + # the transfer-length of that body is determined by one of the following + # [...] 5. By the server closing the connection. (Closing the connection + # cannot be used to indicate the end of a request body, since that would + # leave no possibility for the server to send back a response.)" + if (eof or + self.content_length is None or + (self.content_length is not None and + self.body_buffer.tell() >= self.content_length)): + self.parser_status = self.PS_COMPLETE + + return buf + + def _CheckStartLineLength(self, length): + """Limits the start line buffer size. + + @type length: int + @param length: Buffer size + + """ + if (self.START_LINE_LENGTH_MAX is not None and + length > self.START_LINE_LENGTH_MAX): + raise HttpError("Start line longer than %d chars" % + self.START_LINE_LENGTH_MAX) + + def _CheckHeaderLength(self, length): + """Limits the header buffer size. + + @type length: int + @param length: Buffer size + + """ + if (self.HEADER_LENGTH_MAX is not None and + length > self.HEADER_LENGTH_MAX): + raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX) + + def ParseStartLine(self, start_line): + """Parses the start line of a message. + + Must be overriden by subclass. + + @type start_line: string + @param start_line: Start line string + + """ + raise NotImplementedError() + + def _WillPeerCloseConnection(self): + """Evaluate whether peer will close the connection. + + @rtype: bool + @return: Whether peer will close the connection + + """ + # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option + # for the sender to signal that the connection will be closed after + # completion of the response. For example, + # + # Connection: close + # + # in either the request or the response header fields indicates that the + # connection SHOULD NOT be considered `persistent' (section 8.1) after the + # current request/response is complete." + + hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None) + if hdr_connection: + hdr_connection = hdr_connection.lower() + + # An HTTP/1.1 server is assumed to stay open unless explicitly closed. + if self.msg.start_line.version == HTTP_1_1: + return (hdr_connection and "close" in hdr_connection) + + # Some HTTP/1.0 implementations have support for persistent connections, + # using rules different than HTTP/1.1. + + # For older HTTP, Keep-Alive indicates persistent connection. + if self.msg.headers.get(HTTP_KEEP_ALIVE): + return False + + # At least Akamai returns a "Connection: Keep-Alive" header, which was + # supposed to be sent by the client. + if hdr_connection and "keep-alive" in hdr_connection: + return False + + return True + + def _ParseHeaders(self): + """Parses the headers. + + This function also adjusts internal variables based on header values. + + RFC2616, section 4.3: "The presence of a message-body in a request is + signaled by the inclusion of a Content-Length or Transfer-Encoding header + field in the request's message-headers." + + """ + # Parse headers + self.header_buffer.seek(0, 0) + self.msg.headers = mimetools.Message(self.header_buffer, 0) + + self.peer_will_close = self._WillPeerCloseConnection() + + # Do we have a Content-Length header? + hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None) + if hdr_content_length: + try: + self.content_length = int(hdr_content_length) + except ValueError: + self.content_length = None + if self.content_length is not None and self.content_length < 0: + self.content_length = None + + # if the connection remains open and a content-length was not provided, + # then assume that the connection WILL close. + if self.content_length is None: + self.peer_will_close = True diff --git a/lib/http/client.py b/lib/http/client.py new file mode 100644 index 0000000..50c9172 --- /dev/null +++ b/lib/http/client.py @@ -0,0 +1,388 @@ +# +# + +# Copyright (C) 2007, 2008 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""HTTP client module. + +""" + +import BaseHTTPServer +import cgi +import logging +import OpenSSL +import os +import select +import socket +import sys +import time +import signal +import errno +import threading + +from ganeti import constants +from ganeti import serializer +from ganeti import workerpool +from ganeti import utils +from ganeti import http + + +HTTP_CLIENT_THREADS = 10 + + +class HttpClientRequest(object): + def __init__(self, host, port, method, path, headers=None, post_data=None, + ssl_params=None, ssl_verify_peer=False): + """Describes an HTTP request. + + @type host: string + @param host: Hostname + @type port: int + @param port: Port + @type method: string + @param method: Method name + @type path: string + @param path: Request path + @type headers: dict or None + @param headers: Additional headers to send + @type post_data: string or None + @param post_data: Additional data to send + @type ssl_params: HttpSslParams + @param ssl_params: SSL key and certificate + @type ssl_verify_peer: bool + @param ssl_verify_peer: Whether to compare our certificate with server's + certificate + + """ + if post_data is not None: + assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \ + "Only POST and GET requests support sending data" + + assert path.startswith("/"), "Path must start with slash (/)" + + # Request attributes + self.host = host + self.port = port + self.ssl_params = ssl_params + self.ssl_verify_peer = ssl_verify_peer + self.method = method + self.path = path + self.headers = headers + self.post_data = post_data + + self.success = None + self.error = None + + # Raw response + self.response = None + + # Response attributes + self.resp_version = None + self.resp_status_code = None + self.resp_reason = None + self.resp_headers = None + self.resp_body = None + + +class _HttpClientToServerMessageWriter(http.HttpMessageWriter): + pass + + +class _HttpServerToClientMessageReader(http.HttpMessageReader): + # Length limits + START_LINE_LENGTH_MAX = 512 + HEADER_LENGTH_MAX = 4096 + + def ParseStartLine(self, start_line): + """Parses the status line sent by the server. + + """ + # Empty lines are skipped when reading + assert start_line + + try: + [version, status, reason] = start_line.split(None, 2) + except ValueError: + try: + [version, status] = start_line.split(None, 1) + reason = "" + except ValueError: + version = http.HTTP_0_9 + + if version: + version = version.upper() + + # The status code is a three-digit number + try: + status = int(status) + if status < 100 or status > 999: + status = -1 + except ValueError: + status = -1 + + if status == -1: + raise http.HttpError("Invalid status code (%r)" % start_line) + + return http.HttpServerToClientStartLine(version, status, reason) + + +class HttpClientRequestExecutor(http.HttpSocketBase): + # Default headers + DEFAULT_HEADERS = { + http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION, + # TODO: For keep-alive, don't send "Connection: close" + http.HTTP_CONNECTION: "close", + } + + # Timeouts in seconds for socket layer + # TODO: Soft timeout instead of only socket timeout? + # TODO: Make read timeout configurable per OpCode? + CONNECT_TIMEOUT = 5 + WRITE_TIMEOUT = 10 + READ_TIMEOUT = None + CLOSE_TIMEOUT = 1 + + def __init__(self, req): + """Initializes the HttpClientRequestExecutor class. + + @type req: HttpClientRequest + @param req: Request object + + """ + http.HttpSocketBase.__init__(self) + self.request = req + + self.poller = select.poll() + + try: + # TODO: Implement connection caching/keep-alive + self.sock = self._CreateSocket(req.ssl_params, + req.ssl_verify_peer) + + # Disable Python's timeout + self.sock.settimeout(None) + + # Operate in non-blocking mode + self.sock.setblocking(0) + + response_msg_reader = None + response_msg = None + force_close = True + + self._Connect() + try: + self._SendRequest() + (response_msg_reader, response_msg) = self._ReadResponse() + + # Only wait for server to close if we didn't have any exception. + force_close = False + finally: + # TODO: Keep-alive is not supported, always close connection + force_close = True + http.ShutdownConnection(self.poller, self.sock, + self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT, + response_msg_reader, force_close) + + self.sock.close() + self.sock = None + + req.response = response_msg + + req.resp_version = req.response.start_line.version + req.resp_status_code = req.response.start_line.code + req.resp_reason = req.response.start_line.reason + req.resp_headers = req.response.headers + req.resp_body = req.response.body + + req.success = True + req.error = None + + except http.HttpError, err: + req.success = False + req.error = str(err) + + def _Connect(self): + """Non-blocking connect to host with timeout. + + """ + connected = False + while True: + try: + connect_error = self.sock.connect_ex((self.request.host, + self.request.port)) + except socket.gaierror, err: + raise http.HttpError("Connection failed: %s" % str(err)) + + if connect_error == errno.EINTR: + # Mask signals + pass + + elif connect_error == 0: + # Connection established + connected = True + break + + elif connect_error == errno.EINPROGRESS: + # Connection started + break + + raise http.HttpError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + if not connected: + # Wait for connection + event = http.WaitForSocketCondition(self.poller, self.sock, + select.POLLOUT, self.CONNECT_TIMEOUT) + if event is None: + raise http.HttpError("Timeout while connecting to server") + + # Get error code + connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if connect_error != 0: + raise http.HttpError("Connection failed (%s: %s)" % + (connect_error, os.strerror(connect_error))) + + # Enable TCP keep-alive + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + # If needed, Linux specific options are available to change the TCP + # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and + # TCP_KEEPINTVL. + + def _SendRequest(self): + """Sends request to server. + + """ + # Headers + send_headers = self.DEFAULT_HEADERS.copy() + + if self.request.headers: + send_headers.update(self.request.headers) + + send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port) + + # Response message + msg = http.HttpMessage() + + # Combine request line. We only support HTTP/1.0 (no chunked transfers and + # no keep-alive). + # TODO: For keep-alive, change to HTTP/1.1 + msg.start_line = \ + http.HttpClientToServerStartLine(method=self.request.method.upper(), + path=self.request.path, version=http.HTTP_1_0) + msg.headers = send_headers + msg.body = self.request.post_data + + try: + _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT) + except http.HttpSocketTimeout: + raise http.HttpError("Timeout while sending request") + except socket.error, err: + raise http.HttpError("Error sending request: %s" % err) + + def _ReadResponse(self): + """Read response from server. + + """ + response_msg = http.HttpMessage() + + try: + response_msg_reader = \ + _HttpServerToClientMessageReader(self.sock, response_msg, + self.READ_TIMEOUT) + except http.HttpSocketTimeout: + raise http.HttpError("Timeout while reading response") + except socket.error, err: + raise http.HttpError("Error reading response: %s" % err) + + return (response_msg_reader, response_msg) + + +class _HttpClientPendingRequest(object): + """Data class for pending requests. + + """ + def __init__(self, request): + self.request = request + + # Thread synchronization + self.done = threading.Event() + + +class HttpClientWorker(workerpool.BaseWorker): + """HTTP client worker class. + + """ + def RunTask(self, pend_req): + try: + HttpClientRequestExecutor(pend_req.request) + finally: + pend_req.done.set() + + +class HttpClientWorkerPool(workerpool.WorkerPool): + def __init__(self, manager): + workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS, + HttpClientWorker) + self.manager = manager + + +class HttpClientManager(object): + """Manages HTTP requests. + + """ + def __init__(self): + self._wpool = HttpClientWorkerPool(self) + + def __del__(self): + self.Shutdown() + + def ExecRequests(self, requests): + """Execute HTTP requests. + + This function can be called from multiple threads at the same time. + + @type requests: List of HttpClientRequest instances + @param requests: The requests to execute + @rtype: List of HttpClientRequest instances + @returns: The list of requests passed in + + """ + # _HttpClientPendingRequest is used for internal thread synchronization + pending = [_HttpClientPendingRequest(req) for req in requests] + + try: + # Add requests to queue + for pend_req in pending: + self._wpool.AddTask(pend_req) + + finally: + # In case of an exception we should still wait for the rest, otherwise + # another thread from the worker pool could modify the request object + # after we returned. + + # And wait for them to finish + for pend_req in pending: + pend_req.done.wait() + + # Return original list + return requests + + def Shutdown(self): + self._wpool.Quiesce() + self._wpool.TerminateWorkers() diff --git a/lib/http/server.py b/lib/http/server.py new file mode 100644 index 0000000..d6f2b88 --- /dev/null +++ b/lib/http/server.py @@ -0,0 +1,484 @@ +# +# + +# Copyright (C) 2007, 2008 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +"""HTTP server module. + +""" + +import BaseHTTPServer +import cgi +import logging +import os +import select +import socket +import time +import signal + +from ganeti import constants +from ganeti import serializer +from ganeti import utils +from ganeti import http + + +WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] +MONTHNAME = [None, + 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', + 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] + +# Default error message +DEFAULT_ERROR_CONTENT_TYPE = "text/html" +DEFAULT_ERROR_MESSAGE = """\ + + +Error response + + +

Error response

+

Error code %(code)d. +

Message: %(message)s. +

Error code explanation: %(code)s = %(explain)s. + + +""" + + +def _DateTimeHeader(): + """Return the current date and time formatted for a message header. + + """ + (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime() + return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" % + (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss)) + + +class _HttpServerRequest(object): + """Data structure for HTTP request on server side. + + """ + def __init__(self, request_msg): + # Request attributes + self.request_method = request_msg.start_line.method + self.request_path = request_msg.start_line.path + self.request_headers = request_msg.headers + self.request_body = request_msg.decoded_body + + # Response attributes + self.resp_headers = {} + + +class _HttpServerToClientMessageWriter(http.HttpMessageWriter): + """Writes an HTTP response to client. + + """ + def __init__(self, sock, request_msg, response_msg, write_timeout): + """TODO + + """ + self._request_msg = request_msg + self._response_msg = response_msg + http.HttpMessageWriter.__init__(self, sock, response_msg, write_timeout) + + def HasMessageBody(self): + """Logic to detect whether response should contain a message body. + + """ + if self._request_msg.start_line: + request_method = self._request_msg.start_line.method + else: + request_method = None + + response_code = self._response_msg.start_line.code + + # RFC2616, section 4.3: "A message-body MUST NOT be included in a request + # if the specification of the request method (section 5.1.1) does not allow + # sending an entity-body in requests" + # + # RFC2616, section 9.4: "The HEAD method is identical to GET except that + # the server MUST NOT return a message-body in the response." + # + # RFC2616, section 10.2.5: "The 204 response MUST NOT include a + # message-body [...]" + # + # RFC2616, section 10.3.5: "The 304 response MUST NOT contain a + # message-body, [...]" + + return (http.HttpMessageWriter.HasMessageBody(self) and + (request_method is not None and request_method != http.HTTP_HEAD) and + response_code >= http.HTTP_OK and + response_code not in (http.HTTP_NO_CONTENT, http.HTTP_NOT_MODIFIED)) + + +class _HttpClientToServerMessageReader(http.HttpMessageReader): + """Reads an HTTP request sent by client. + + """ + # Length limits + START_LINE_LENGTH_MAX = 4096 + HEADER_LENGTH_MAX = 4096 + + def ParseStartLine(self, start_line): + """Parses the start line sent by client. + + Example: "GET /index.html HTTP/1.1" + + @type start_line: string + @param start_line: Start line + + """ + # Empty lines are skipped when reading + assert start_line + + logging.debug("HTTP request: %s", start_line) + + words = start_line.split() + + if len(words) == 3: + [method, path, version] = words + if version[:5] != 'HTTP/': + raise http.HttpBadRequest("Bad request version (%r)" % version) + + try: + base_version_number = version.split("/", 1)[1] + version_number = base_version_number.split(".") + + # RFC 2145 section 3.1 says there can be only one "." and + # - major and minor numbers MUST be treated as + # separate integers; + # - HTTP/2.4 is a lower version than HTTP/2.13, which in + # turn is lower than HTTP/12.3; + # - Leading zeros MUST be ignored by recipients. + if len(version_number) != 2: + raise http.HttpBadRequest("Bad request version (%r)" % version) + + version_number = (int(version_number[0]), int(version_number[1])) + except (ValueError, IndexError): + raise http.HttpBadRequest("Bad request version (%r)" % version) + + if version_number >= (2, 0): + raise http.HttpVersionNotSupported("Invalid HTTP Version (%s)" % + base_version_number) + + elif len(words) == 2: + version = http.HTTP_0_9 + [method, path] = words + if method != http.HTTP_GET: + raise http.HttpBadRequest("Bad HTTP/0.9 request type (%r)" % method) + + else: + raise http.HttpBadRequest("Bad request syntax (%r)" % start_line) + + return http.HttpClientToServerStartLine(method, path, version) + + +class _HttpServerRequestExecutor(object): + """Implements server side of HTTP. + + This class implements the server side of HTTP. It's based on code of Python's + BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII + character encodings. Keep-alive connections are not supported. + + """ + # The default request version. This only affects responses up until + # the point where the request line is parsed, so it mainly decides what + # the client gets back when sending a malformed request line. + # Most web servers default to HTTP 0.9, i.e. don't send a status line. + default_request_version = http.HTTP_0_9 + + # Error message settings + error_message_format = DEFAULT_ERROR_MESSAGE + error_content_type = DEFAULT_ERROR_CONTENT_TYPE + + responses = BaseHTTPServer.BaseHTTPRequestHandler.responses + + # Timeouts in seconds for socket layer + WRITE_TIMEOUT = 10 + READ_TIMEOUT = 10 + CLOSE_TIMEOUT = 1 + + def __init__(self, server, sock, client_addr): + """Initializes this class. + + """ + self.server = server + self.sock = sock + self.client_addr = client_addr + + self.poller = select.poll() + + self.request_msg = http.HttpMessage() + self.response_msg = http.HttpMessage() + + self.response_msg.start_line = \ + http.HttpServerToClientStartLine(version=self.default_request_version, + code=None, reason=None) + + # Disable Python's timeout + self.sock.settimeout(None) + + # Operate in non-blocking mode + self.sock.setblocking(0) + + logging.info("Connection from %s:%s", client_addr[0], client_addr[1]) + try: + request_msg_reader = None + force_close = True + try: + try: + try: + request_msg_reader = self._ReadRequest() + self._HandleRequest() + + # Only wait for client to close if we didn't have any exception. + force_close = False + except http.HttpException, err: + self._SetErrorStatus(err) + finally: + # Try to send a response + self._SendResponse() + finally: + http.ShutdownConnection(self.poller, sock, + self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT, + request_msg_reader, force_close) + + self.sock.close() + self.sock = None + finally: + logging.info("Disconnected %s:%s", client_addr[0], client_addr[1]) + + def _ReadRequest(self): + """Reads a request sent by client. + + """ + try: + request_msg_reader = \ + _HttpClientToServerMessageReader(self.sock, self.request_msg, + self.READ_TIMEOUT) + except http.HttpSocketTimeout: + raise http.HttpError("Timeout while reading request") + except socket.error, err: + raise http.HttpError("Error reading request: %s" % err) + + self.response_msg.start_line.version = self.request_msg.start_line.version + + return request_msg_reader + + def _HandleRequest(self): + """Calls the handler function for the current request. + + """ + handler_context = _HttpServerRequest(self.request_msg) + + try: + result = self.server.HandleRequest(handler_context) + except (http.HttpException, KeyboardInterrupt, SystemExit): + raise + except Exception, err: + logging.exception("Caught exception") + raise http.HttpInternalError(message=str(err)) + except: + logging.exception("Unknown exception") + raise http.HttpInternalError(message="Unknown error") + + # TODO: Content-type + encoder = http.HttpJsonConverter() + self.response_msg.start_line.code = http.HTTP_OK + self.response_msg.body = encoder.Encode(result) + self.response_msg.headers = handler_context.resp_headers + self.response_msg.headers[http.HTTP_CONTENT_TYPE] = encoder.CONTENT_TYPE + + def _SendResponse(self): + """Sends the response to the client. + + """ + if self.response_msg.start_line.code is None: + return + + if not self.response_msg.headers: + self.response_msg.headers = {} + + self.response_msg.headers.update({ + # TODO: Keep-alive is not supported + http.HTTP_CONNECTION: "close", + http.HTTP_DATE: _DateTimeHeader(), + http.HTTP_SERVER: http.HTTP_GANETI_VERSION, + }) + + # Get response reason based on code + response_code = self.response_msg.start_line.code + if response_code in self.responses: + response_reason = self.responses[response_code][0] + else: + response_reason = "" + self.response_msg.start_line.reason = response_reason + + logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1], + self.request_msg.start_line, response_code) + + try: + _HttpServerToClientMessageWriter(self.sock, self.request_msg, + self.response_msg, self.WRITE_TIMEOUT) + except http.HttpSocketTimeout: + raise http.HttpError("Timeout while sending response") + except socket.error, err: + raise http.HttpError("Error sending response: %s" % err) + + def _SetErrorStatus(self, err): + """Sets the response code and body from a HttpException. + + @type err: HttpException + @param err: Exception instance + + """ + try: + (shortmsg, longmsg) = self.responses[err.code] + except KeyError: + shortmsg = longmsg = "Unknown" + + if err.message: + message = err.message + else: + message = shortmsg + + values = { + "code": err.code, + "message": cgi.escape(message), + "explain": longmsg, + } + + self.response_msg.start_line.code = err.code + self.response_msg.headers = { + http.HTTP_CONTENT_TYPE: self.error_content_type, + } + self.response_msg.body = self.error_message_format % values + + +class HttpServer(http.HttpSocketBase): + """Generic HTTP server class + + Users of this class must subclass it and override the HandleRequest function. + + """ + MAX_CHILDREN = 20 + + def __init__(self, mainloop, local_address, port, + ssl_params=None, ssl_verify_peer=False): + """Initializes the HTTP server + + @type mainloop: ganeti.daemon.Mainloop + @param mainloop: Mainloop used to poll for I/O events + @type local_addess: string + @param local_address: Local IP address to bind to + @type port: int + @param port: TCP port to listen on + @type ssl_params: HttpSslParams + @param ssl_params: SSL key and certificate + @type ssl_verify_peer: bool + @param ssl_verify_peer: Whether to require client certificate and compare + it with our certificate + + """ + http.HttpSocketBase.__init__(self) + + self.mainloop = mainloop + self.local_address = local_address + self.port = port + + self.socket = self._CreateSocket(ssl_params, ssl_verify_peer) + + # Allow port to be reused + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + self._children = [] + + mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN) + mainloop.RegisterSignal(self) + + def Start(self): + self.socket.bind((self.local_address, self.port)) + self.socket.listen(5) + + def Stop(self): + self.socket.close() + + def OnIO(self, fd, condition): + if condition & select.POLLIN: + self._IncomingConnection() + + def OnSignal(self, signum): + if signum == signal.SIGCHLD: + self._CollectChildren(True) + + def _CollectChildren(self, quick): + """Checks whether any child processes are done + + @type quick: bool + @param quick: Whether to only use non-blocking functions + + """ + if not quick: + # Don't wait for other processes if it should be a quick check + while len(self._children) > self.MAX_CHILDREN: + try: + # Waiting without a timeout brings us into a potential DoS situation. + # As soon as too many children run, we'll not respond to new + # requests. The real solution would be to add a timeout for children + # and killing them after some time. + pid, status = os.waitpid(0, 0) + except os.error: + pid = None + if pid and pid in self._children: + self._children.remove(pid) + + for child in self._children: + try: + pid, status = os.waitpid(child, os.WNOHANG) + except os.error: + pid = None + if pid and pid in self._children: + self._children.remove(pid) + + def _IncomingConnection(self): + """Called for each incoming connection + + """ + (connection, client_addr) = self.socket.accept() + + self._CollectChildren(False) + + pid = os.fork() + if pid == 0: + # Child process + try: + _HttpServerRequestExecutor(self, connection, client_addr) + except Exception: + logging.exception("Error while handling request from %s:%s", + client_addr[0], client_addr[1]) + os._exit(1) + os._exit(0) + else: + self._children.append(pid) + + def HandleRequest(self, req): + """Handles a request. + + Must be overriden by subclass. + + """ + raise NotImplementedError() -- 1.7.10.4