4 # Copyright (C) 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32 from cStringIO import StringIO
34 from ganeti import constants
35 from ganeti import serializer
36 from ganeti import utils
39 HTTP_CLIENT_THREADS = 10
41 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
45 HTTP_NOT_MODIFIED = 304
58 HTTP_SERVER = "Server"
60 HTTP_USER_AGENT = "User-Agent"
61 HTTP_CONTENT_TYPE = "Content-Type"
62 HTTP_CONTENT_LENGTH = "Content-Length"
63 HTTP_CONNECTION = "Connection"
64 HTTP_KEEP_ALIVE = "Keep-Alive"
66 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
71 SOCKOP_SHUTDOWN) = range(3)
74 class HttpError(Exception):
75 """Internal exception for HTTP errors.
77 This should only be used for internal error reporting.
82 class _HttpClientError(Exception):
83 """Internal exception for HTTP client errors.
85 This should only be used for internal error reporting.
90 class HttpSocketTimeout(Exception):
91 """Internal exception for socket timeouts.
93 This should only be used for internal error reporting.
98 class HttpException(Exception):
102 def __init__(self, message=None):
103 Exception.__init__(self)
104 if message is not None:
105 self.message = message
108 class HttpBadRequest(HttpException):
112 class HttpForbidden(HttpException):
116 class HttpNotFound(HttpException):
120 class HttpGone(HttpException):
124 class HttpLengthRequired(HttpException):
128 class HttpInternalError(HttpException):
132 class HttpNotImplemented(HttpException):
136 class HttpServiceUnavailable(HttpException):
140 class HttpVersionNotSupported(HttpException):
144 class HttpJsonConverter:
145 CONTENT_TYPE = "application/json"
147 def Encode(self, data):
148 return serializer.DumpJson(data)
150 def Decode(self, data):
151 return serializer.LoadJson(data)
154 def WaitForSocketCondition(poller, sock, event, timeout):
155 """Waits for a condition to occur on the socket.
157 @type poller: select.Poller
158 @param poller: Poller object as created by select.poll()
160 @param socket: Wait for events on this socket
162 @param event: ORed condition (see select module)
163 @type timeout: float or None
164 @param timeout: Timeout in seconds
166 @return: None for timeout, otherwise occured conditions
169 check = (event | select.POLLPRI |
170 select.POLLNVAL | select.POLLHUP | select.POLLERR)
172 if timeout is not None:
173 # Poller object expects milliseconds
176 poller.register(sock, event)
179 # TODO: If the main thread receives a signal and we have no timeout, we
180 # could wait forever. This should check a global "quit" flag or
181 # something every so often.
182 io_events = poller.poll(timeout)
186 for (evfd, evcond) in io_events:
190 poller.unregister(sock)
193 def SocketOperation(poller, sock, op, arg1, timeout):
194 """Wrapper around socket functions.
196 This function abstracts error handling for socket operations, especially
197 for the complicated interaction with OpenSSL.
199 @type poller: select.Poller
200 @param poller: Poller object as created by select.poll()
202 @param socket: Socket for the operation
204 @param op: Operation to execute (SOCKOP_* constants)
206 @param arg1: Parameter for function (if needed)
207 @type timeout: None or float
208 @param timeout: Timeout in seconds or None
211 # TODO: event_poll/event_check/override
212 if op == SOCKOP_SEND:
213 event_poll = select.POLLOUT
214 event_check = select.POLLOUT
216 elif op == SOCKOP_RECV:
217 event_poll = select.POLLIN
218 event_check = select.POLLIN | select.POLLPRI
220 elif op == SOCKOP_SHUTDOWN:
224 # The timeout is only used when OpenSSL requests polling for a condition.
225 # It is not advisable to have no timeout for shutdown.
229 raise AssertionError("Invalid socket operation")
231 # No override by default
235 # Poll only for certain operations and when asked for by an override
236 if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
238 wait_for_event = event_override
240 wait_for_event = event_poll
242 event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
244 raise HttpSocketTimeout()
246 if (op == SOCKOP_RECV and
247 event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
250 if not event & wait_for_event:
258 if op == SOCKOP_SEND:
259 return sock.send(arg1)
261 elif op == SOCKOP_RECV:
262 return sock.recv(arg1)
264 elif op == SOCKOP_SHUTDOWN:
265 if isinstance(sock, OpenSSL.SSL.ConnectionType):
266 # PyOpenSSL's shutdown() doesn't take arguments
267 return sock.shutdown()
269 return sock.shutdown(arg1)
271 except OpenSSL.SSL.WantWriteError:
272 # OpenSSL wants to write, poll for POLLOUT
273 event_override = select.POLLOUT
276 except OpenSSL.SSL.WantReadError:
277 # OpenSSL wants to read, poll for POLLIN
278 event_override = select.POLLIN | select.POLLPRI
281 except OpenSSL.SSL.WantX509LookupError:
284 except OpenSSL.SSL.SysCallError, err:
285 if op == SOCKOP_SEND:
286 # arg1 is the data when writing
287 if err.args and err.args[0] == -1 and arg1 == "":
288 # errors when writing empty strings are expected
292 elif op == SOCKOP_RECV:
293 if err.args == (-1, _SSL_UNEXPECTED_EOF):
296 raise socket.error(err.args)
298 except OpenSSL.SSL.Error, err:
299 raise socket.error(err.args)
301 except socket.error, err:
302 if err.args and err.args[0] == errno.EAGAIN:
309 def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
311 """Closes the connection.
314 poller = select.poll()
316 #print msgreader.peer_will_close, force
317 if msgreader and msgreader.peer_will_close and not force:
318 # Wait for peer to close
320 # Check whether it's actually closed
321 if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
323 except (socket.error, HttpError, HttpSocketTimeout):
324 # Ignore errors at this stage
327 # Close the connection from our side
329 SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
331 except HttpSocketTimeout:
332 raise HttpError("Timeout while shutting down connection")
333 except socket.error, err:
334 raise HttpError("Error while shutting down connection: %s" % err)
337 class HttpSslParams(object):
338 """Data class for SSL key and certificate.
341 def __init__(self, ssl_key_path, ssl_cert_path):
342 """Initializes this class.
344 @type ssl_key_path: string
345 @param ssl_key_path: Path to file containing SSL key in PEM format
346 @type ssl_cert_path: string
347 @param ssl_cert_path: Path to file containing SSL certificate in PEM format
350 self.ssl_key_pem = utils.ReadFile(ssl_key_path)
351 self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
354 return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
357 def GetCertificate(self):
358 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
362 class HttpSocketBase(object):
363 """Base class for HTTP server and client.
367 self._using_ssl = None
368 self._ssl_params = None
370 self._ssl_cert = None
372 def _CreateSocket(self, ssl_params, ssl_verify_peer):
373 """Creates a TCP socket and initializes SSL if needed.
375 @type ssl_params: HttpSslParams
376 @param ssl_params: SSL key and certificate
377 @type ssl_verify_peer: bool
378 @param ssl_verify_peer: Whether to require client certificate and compare
379 it with our certificate
382 self._ssl_params = ssl_params
384 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
386 # Should we enable SSL?
387 self._using_ssl = ssl_params is not None
389 if not self._using_ssl:
392 self._ssl_key = ssl_params.GetKey()
393 self._ssl_cert = ssl_params.GetCertificate()
395 ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
396 ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
398 ctx.use_privatekey(self._ssl_key)
399 ctx.use_certificate(self._ssl_cert)
400 ctx.check_privatekey()
403 ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
404 OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
405 self._SSLVerifyCallback)
407 return OpenSSL.SSL.Connection(ctx, sock)
409 def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
410 """Verify the certificate provided by the peer
412 We only compare fingerprints. The client must use the same certificate as
416 assert self._ssl_params, "SSL not initialized"
418 return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
419 self._ssl_cert.digest("md5") == cert.digest("md5"))
422 class HttpMessage(object):
423 """Data structure for HTTP message.
427 self.start_line = None
430 self.decoded_body = None
433 class HttpClientToServerStartLine(object):
434 """Data structure for HTTP request start line.
437 def __init__(self, method, path, version):
440 self.version = version
443 return "%s %s %s" % (self.method, self.path, self.version)
446 class HttpServerToClientStartLine(object):
447 """Data structure for HTTP response start line.
450 def __init__(self, version, code, reason):
451 self.version = version
456 return "%s %s %s" % (self.version, self.code, self.reason)
459 class HttpMessageWriter(object):
460 """Writes an HTTP message to a socket.
463 def __init__(self, sock, msg, write_timeout):
466 self._PrepareMessage()
468 buf = self._FormatMessage()
470 poller = select.poll()
472 # Send only 4 KB at a time
475 sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
481 assert not buf, "Message wasn't sent completely"
483 def _PrepareMessage(self):
484 """Prepares the HTTP message by setting mandatory headers.
487 # RFC2616, section 4.3: "The presence of a message-body in a request is
488 # signaled by the inclusion of a Content-Length or Transfer-Encoding header
489 # field in the request's message-headers."
491 self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
493 def _FormatMessage(self):
494 """Serializes the HTTP message into a string.
500 buf.write(str(self._msg.start_line))
504 if self._msg.start_line.version != HTTP_0_9:
505 for name, value in self._msg.headers.iteritems():
506 buf.write("%s: %s\r\n" % (name, value))
510 # Add message body if needed
511 if self.HasMessageBody():
512 buf.write(self._msg.body)
515 logging.warning("Ignoring message body")
517 return buf.getvalue()
519 def HasMessageBody(self):
520 """Checks whether the HTTP message contains a body.
522 Can be overriden by subclasses.
525 return bool(self._msg.body)
528 class HttpMessageReader(object):
529 """Reads HTTP message from socket.
533 START_LINE_LENGTH_MAX = None
534 HEADER_LENGTH_MAX = None
536 # Parser state machine
537 PS_START_LINE = "start-line"
538 PS_HEADERS = "headers"
539 PS_BODY = "entity-body"
540 PS_COMPLETE = "complete"
542 def __init__(self, sock, msg, read_timeout):
546 self.poller = select.poll()
547 self.start_line_buffer = None
548 self.header_buffer = StringIO()
549 self.body_buffer = StringIO()
550 self.parser_status = self.PS_START_LINE
551 self.content_length = None
552 self.peer_will_close = None
556 while self.parser_status != self.PS_COMPLETE:
557 data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096,
565 # Do some parsing and error checking while more data arrives
566 buf = self._ContinueParsing(buf, eof)
568 # Must be done only after the buffer has been evaluated
569 # TODO: Connection-length < len(data read) and connection closed
571 self.parser_status in (self.PS_START_LINE,
573 raise HttpError("Connection closed prematurely")
576 buf = self._ContinueParsing(buf, True)
578 assert self.parser_status == self.PS_COMPLETE
579 assert not buf, "Parser didn't read full response"
581 msg.body = self.body_buffer.getvalue()
583 # TODO: Content-type, error handling
585 msg.decoded_body = HttpJsonConverter().Decode(msg.body)
587 msg.decoded_body = None
590 logging.debug("Message body: %s", msg.decoded_body)
592 def _ContinueParsing(self, buf, eof):
593 """Main function for HTTP message state machine.
596 @param buf: Receive buffer
598 @param eof: Whether we've reached EOF on the socket
600 @return: Updated receive buffer
603 if self.parser_status == self.PS_START_LINE:
606 idx = buf.find("\r\n")
608 # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
609 # ignore any empty line(s) received where a Request-Line is expected.
610 # In other words, if the server is reading the protocol stream at the
611 # beginning of a message and receives a CRLF first, it should ignore
614 # TODO: Limit number of CRLFs for safety?
619 self.start_line_buffer = buf[:idx]
621 self._CheckStartLineLength(len(self.start_line_buffer))
623 # Remove status line, including CRLF
626 self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
628 self.parser_status = self.PS_HEADERS
630 # Check whether incoming data is getting too large, otherwise we just
631 # fill our read buffer.
632 self._CheckStartLineLength(len(buf))
636 # TODO: Handle messages without headers
637 if self.parser_status == self.PS_HEADERS:
638 # Wait for header end
639 idx = buf.find("\r\n\r\n")
641 self.header_buffer.write(buf[:idx + 2])
643 self._CheckHeaderLength(self.header_buffer.tell())
645 # Remove headers, including CRLF
650 self.parser_status = self.PS_BODY
652 # Check whether incoming data is getting too large, otherwise we just
653 # fill our read buffer.
654 self._CheckHeaderLength(len(buf))
656 if self.parser_status == self.PS_BODY:
657 # TODO: Implement max size for body_buffer
658 self.body_buffer.write(buf)
661 # Check whether we've read everything
663 # RFC2616, section 4.4: "When a message-body is included with a message,
664 # the transfer-length of that body is determined by one of the following
665 # [...] 5. By the server closing the connection. (Closing the connection
666 # cannot be used to indicate the end of a request body, since that would
667 # leave no possibility for the server to send back a response.)"
669 self.content_length is None or
670 (self.content_length is not None and
671 self.body_buffer.tell() >= self.content_length)):
672 self.parser_status = self.PS_COMPLETE
676 def _CheckStartLineLength(self, length):
677 """Limits the start line buffer size.
680 @param length: Buffer size
683 if (self.START_LINE_LENGTH_MAX is not None and
684 length > self.START_LINE_LENGTH_MAX):
685 raise HttpError("Start line longer than %d chars" %
686 self.START_LINE_LENGTH_MAX)
688 def _CheckHeaderLength(self, length):
689 """Limits the header buffer size.
692 @param length: Buffer size
695 if (self.HEADER_LENGTH_MAX is not None and
696 length > self.HEADER_LENGTH_MAX):
697 raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
699 def ParseStartLine(self, start_line):
700 """Parses the start line of a message.
702 Must be overriden by subclass.
704 @type start_line: string
705 @param start_line: Start line string
708 raise NotImplementedError()
710 def _WillPeerCloseConnection(self):
711 """Evaluate whether peer will close the connection.
714 @return: Whether peer will close the connection
717 # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
718 # for the sender to signal that the connection will be closed after
719 # completion of the response. For example,
723 # in either the request or the response header fields indicates that the
724 # connection SHOULD NOT be considered `persistent' (section 8.1) after the
725 # current request/response is complete."
727 hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
729 hdr_connection = hdr_connection.lower()
731 # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
732 if self.msg.start_line.version == HTTP_1_1:
733 return (hdr_connection and "close" in hdr_connection)
735 # Some HTTP/1.0 implementations have support for persistent connections,
736 # using rules different than HTTP/1.1.
738 # For older HTTP, Keep-Alive indicates persistent connection.
739 if self.msg.headers.get(HTTP_KEEP_ALIVE):
742 # At least Akamai returns a "Connection: Keep-Alive" header, which was
743 # supposed to be sent by the client.
744 if hdr_connection and "keep-alive" in hdr_connection:
749 def _ParseHeaders(self):
750 """Parses the headers.
752 This function also adjusts internal variables based on header values.
754 RFC2616, section 4.3: "The presence of a message-body in a request is
755 signaled by the inclusion of a Content-Length or Transfer-Encoding header
756 field in the request's message-headers."
760 self.header_buffer.seek(0, 0)
761 self.msg.headers = mimetools.Message(self.header_buffer, 0)
763 self.peer_will_close = self._WillPeerCloseConnection()
765 # Do we have a Content-Length header?
766 hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
767 if hdr_content_length:
769 self.content_length = int(hdr_content_length)
771 self.content_length = None
772 if self.content_length is not None and self.content_length < 0:
773 self.content_length = None
775 # if the connection remains open and a content-length was not provided,
776 # then assume that the connection WILL close.
777 if self.content_length is None:
778 self.peer_will_close = True