3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 """HTTP server module.
37 from cStringIO import StringIO
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import workerpool
42 from ganeti import utils
45 HTTP_CLIENT_THREADS = 10
47 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
49 WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
51 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
54 # Default error message
55 DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56 DEFAULT_ERROR_MESSAGE = """\
58 <title>Error response</title>
61 <h1>Error response</h1>
62 <p>Error code %(code)d.
63 <p>Message: %(message)s.
64 <p>Error code explanation: %(code)s = %(explain)s.
70 HTTP_NOT_MODIFIED = 304
83 HTTP_SERVER = "Server"
85 HTTP_USER_AGENT = "User-Agent"
86 HTTP_CONTENT_TYPE = "Content-Type"
87 HTTP_CONTENT_LENGTH = "Content-Length"
88 HTTP_CONNECTION = "Connection"
89 HTTP_KEEP_ALIVE = "Keep-Alive"
91 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
96 SOCKOP_SHUTDOWN) = range(3)
99 class SocketClosed(socket.error):
103 class _HttpClientError(Exception):
104 """Internal exception for HTTP client errors.
106 This should only be used for internal error reporting.
111 class _HttpSocketTimeout(Exception):
112 """Internal exception for socket timeouts.
114 This should only be used for internal error reporting.
119 class HTTPException(Exception):
123 def __init__(self, message=None):
124 Exception.__init__(self)
125 if message is not None:
126 self.message = message
129 class HTTPBadRequest(HTTPException):
133 class HTTPForbidden(HTTPException):
137 class HTTPNotFound(HTTPException):
141 class HTTPGone(HTTPException):
145 class HTTPLengthRequired(HTTPException):
149 class HTTPInternalError(HTTPException):
153 class HTTPNotImplemented(HTTPException):
157 class HTTPServiceUnavailable(HTTPException):
161 class HTTPVersionNotSupported(HTTPException):
165 class HTTPJsonConverter:
166 CONTENT_TYPE = "application/json"
168 def Encode(self, data):
169 return serializer.DumpJson(data)
171 def Decode(self, data):
172 return serializer.LoadJson(data)
175 def WaitForSocketCondition(poller, sock, event, timeout):
176 """Waits for a condition to occur on the socket.
178 @type poller: select.Poller
179 @param poller: Poller object as created by select.poll()
181 @param socket: Wait for events on this socket
183 @param event: ORed condition (see select module)
184 @type timeout: float or None
185 @param timeout: Timeout in seconds
187 @return: None for timeout, otherwise occured conditions
190 check = (event | select.POLLPRI |
191 select.POLLNVAL | select.POLLHUP | select.POLLERR)
193 if timeout is not None:
194 # Poller object expects milliseconds
197 poller.register(sock, event)
200 # TODO: If the main thread receives a signal and we have no timeout, we
201 # could wait forever. This should check a global "quit" flag or
202 # something every so often.
203 io_events = poller.poll(timeout)
207 for (evfd, evcond) in io_events:
211 poller.unregister(sock)
214 def SocketOperation(poller, sock, op, arg1, timeout):
215 """Wrapper around socket functions.
217 This function abstracts error handling for socket operations, especially
218 for the complicated interaction with OpenSSL.
220 @type poller: select.Poller
221 @param poller: Poller object as created by select.poll()
223 @param socket: Socket for the operation
225 @param op: Operation to execute (SOCKOP_* constants)
227 @param arg1: Parameter for function (if needed)
228 @type timeout: None or float
229 @param timeout: Timeout in seconds or None
232 # TODO: event_poll/event_check/override
233 if op == SOCKOP_SEND:
234 event_poll = select.POLLOUT
235 event_check = select.POLLOUT
237 elif op == SOCKOP_RECV:
238 event_poll = select.POLLIN
239 event_check = select.POLLIN | select.POLLPRI
241 elif op == SOCKOP_SHUTDOWN:
245 # The timeout is only used when OpenSSL requests polling for a condition.
246 # It is not advisable to have no timeout for shutdown.
250 raise AssertionError("Invalid socket operation")
252 # No override by default
256 # Poll only for certain operations and when asked for by an override
257 if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
259 wait_for_event = event_override
261 wait_for_event = event_poll
263 event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
265 raise _HttpSocketTimeout()
267 if (op == SOCKOP_RECV and
268 event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
271 if not event & wait_for_event:
279 if op == SOCKOP_SEND:
280 return sock.send(arg1)
282 elif op == SOCKOP_RECV:
283 return sock.recv(arg1)
285 elif op == SOCKOP_SHUTDOWN:
286 if isinstance(sock, OpenSSL.SSL.ConnectionType):
287 # PyOpenSSL's shutdown() doesn't take arguments
288 return sock.shutdown()
290 return sock.shutdown(arg1)
292 except OpenSSL.SSL.WantWriteError:
293 # OpenSSL wants to write, poll for POLLOUT
294 event_override = select.POLLOUT
297 except OpenSSL.SSL.WantReadError:
298 # OpenSSL wants to read, poll for POLLIN
299 event_override = select.POLLIN | select.POLLPRI
302 except OpenSSL.SSL.WantX509LookupError:
305 except OpenSSL.SSL.SysCallError, err:
306 if op == SOCKOP_SEND:
307 # arg1 is the data when writing
308 if err.args and err.args[0] == -1 and arg1 == "":
309 # errors when writing empty strings are expected
313 elif op == SOCKOP_RECV:
314 if err.args == (-1, _SSL_UNEXPECTED_EOF):
317 raise socket.error(err.args)
319 except OpenSSL.SSL.Error, err:
320 raise socket.error(err.args)
322 except socket.error, err:
323 if err.args and err.args[0] == errno.EAGAIN:
330 class HttpSslParams(object):
331 """Data class for SSL key and certificate.
334 def __init__(self, ssl_key_path, ssl_cert_path):
335 """Initializes this class.
337 @type ssl_key_path: string
338 @param ssl_key_path: Path to file containing SSL key in PEM format
339 @type ssl_cert_path: string
340 @param ssl_cert_path: Path to file containing SSL certificate in PEM format
343 self.ssl_key_pem = utils.ReadFile(ssl_key_path)
344 self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
347 return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
350 def GetCertificate(self):
351 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
355 class _HttpSocketBase(object):
356 """Base class for HTTP server and client.
360 self._using_ssl = None
361 self._ssl_params = None
363 self._ssl_cert = None
365 def _CreateSocket(self, ssl_params, ssl_verify_peer):
366 """Creates a TCP socket and initializes SSL if needed.
368 @type ssl_params: HttpSslParams
369 @param ssl_params: SSL key and certificate
370 @type ssl_verify_peer: bool
371 @param ssl_verify_peer: Whether to require client certificate and compare
372 it with our certificate
375 self._ssl_params = ssl_params
377 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
379 # Should we enable SSL?
380 self._using_ssl = ssl_params is not None
382 if not self._using_ssl:
385 self._ssl_key = ssl_params.GetKey()
386 self._ssl_cert = ssl_params.GetCertificate()
388 ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
389 ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
391 ctx.use_privatekey(self._ssl_key)
392 ctx.use_certificate(self._ssl_cert)
393 ctx.check_privatekey()
396 ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
397 OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
398 self._SSLVerifyCallback)
400 return OpenSSL.SSL.Connection(ctx, sock)
402 def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
403 """Verify the certificate provided by the peer
405 We only compare fingerprints. The client must use the same certificate as
409 assert self._ssl_params, "SSL not initialized"
411 return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
412 self._ssl_cert.digest("md5") == cert.digest("md5"))
415 class HttpServerRequestExecutor(object):
416 """Implements server side of HTTP
418 This class implements the server side of HTTP. It's based on code of Python's
419 BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
420 character encodings. Keep-alive connections are not supported.
423 # The default request version. This only affects responses up until
424 # the point where the request line is parsed, so it mainly decides what
425 # the client gets back when sending a malformed request line.
426 # Most web servers default to HTTP 0.9, i.e. don't send a status line.
427 default_request_version = HTTP_0_9
429 # Error message settings
430 error_message_format = DEFAULT_ERROR_MESSAGE
431 error_content_type = DEFAULT_ERROR_CONTENT_TYPE
433 responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
435 def __init__(self, server, conn, client_addr, fileio_class):
436 """Initializes this class.
438 Part of the initialization is reading the request and eventual POST/PUT
439 data sent by the client.
442 self._server = server
444 # We default rfile to buffered because otherwise it could be
445 # really slow for large data (a getc() call per byte); we make
446 # wfile unbuffered because (a) often after a write() we want to
447 # read and we need to flush the line; (b) big writes to unbuffered
448 # files are typically optimized by stdio even when big reads
450 self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
451 self.wfile = fileio_class(conn, mode="wb", bufsize=0)
453 self.client_addr = client_addr
455 self.request_headers = None
456 self.request_method = None
457 self.request_path = None
458 self.request_requestline = None
459 self.request_version = self.default_request_version
461 self.response_body = None
462 self.response_code = HTTP_OK
463 self.response_content_type = None
464 self.response_headers = {}
466 logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
471 # Read, parse and handle request
474 self._HandleRequest()
475 except HTTPException, err:
476 self._SetErrorStatus(err)
478 # Try to send a response
484 logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
487 if not self.wfile.closed:
492 def _DateTimeHeader(self):
493 """Return the current date and time formatted for a message header.
496 (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
497 return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
498 (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
500 def _SetErrorStatus(self, err):
501 """Sets the response code and body from a HTTPException.
503 @type err: HTTPException
504 @param err: Exception instance
508 (shortmsg, longmsg) = self.responses[err.code]
510 shortmsg = longmsg = "Unknown"
513 message = err.message
519 "message": cgi.escape(message),
523 self.response_code = err.code
524 self.response_content_type = self.error_content_type
525 self.response_body = self.error_message_format % values
527 def _HandleRequest(self):
528 """Handle the actual request.
530 Calls the actual handler function and converts exceptions into HTTP errors.
533 # Don't do anything if there's already been a problem
534 if self.response_code != HTTP_OK:
537 assert self.request_method, "Status code %s requires a method" % HTTP_OK
539 # Check whether client is still there
544 result = self._server.HandleRequest(self)
547 encoder = HTTPJsonConverter()
548 body = encoder.Encode(result)
550 self.response_content_type = encoder.CONTENT_TYPE
551 self.response_body = body
552 except (HTTPException, KeyboardInterrupt, SystemExit):
554 except Exception, err:
555 logging.exception("Caught exception")
556 raise HTTPInternalError(message=str(err))
558 logging.exception("Unknown exception")
559 raise HTTPInternalError(message="Unknown error")
561 except HTTPException, err:
562 self._SetErrorStatus(err)
564 def _SendResponse(self):
565 """Sends response to the client.
568 # Check whether client is still there
571 logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
572 self.request_requestline, self.response_code)
574 if self.response_code in self.responses:
575 response_message = self.responses[self.response_code][0]
577 response_message = ""
579 if self.request_version != HTTP_0_9:
580 self.wfile.write("%s %d %s\r\n" %
581 (self.request_version, self.response_code,
583 self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
584 self._SendHeader(HTTP_DATE, self._DateTimeHeader())
585 self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
586 self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
587 for key, val in self.response_headers.iteritems():
588 self._SendHeader(key, val)
590 # We don't support keep-alive at this time
591 self._SendHeader(HTTP_CONNECTION, "close")
592 self.wfile.write("\r\n")
594 if (self.request_method != HTTP_HEAD and
595 self.response_code >= HTTP_OK and
596 self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
597 self.wfile.write(self.response_body)
599 def _SendHeader(self, name, value):
600 if self.request_version != HTTP_0_9:
601 self.wfile.write("%s: %s\r\n" % (name, value))
603 def _ReadRequest(self):
604 """Reads and parses request line
607 raw_requestline = self.rfile.readline()
609 requestline = raw_requestline
610 if requestline[-2:] == '\r\n':
611 requestline = requestline[:-2]
612 elif requestline[-1:] == '\n':
613 requestline = requestline[:-1]
616 raise HTTPBadRequest("Empty request line")
618 self.request_requestline = requestline
620 logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
622 words = requestline.split()
625 [method, path, version] = words
626 if version[:5] != 'HTTP/':
627 raise HTTPBadRequest("Bad request version (%r)" % version)
630 base_version_number = version.split('/', 1)[1]
631 version_number = base_version_number.split(".")
633 # RFC 2145 section 3.1 says there can be only one "." and
634 # - major and minor numbers MUST be treated as
636 # - HTTP/2.4 is a lower version than HTTP/2.13, which in
637 # turn is lower than HTTP/12.3;
638 # - Leading zeros MUST be ignored by recipients.
639 if len(version_number) != 2:
640 raise HTTPBadRequest("Bad request version (%r)" % version)
642 version_number = int(version_number[0]), int(version_number[1])
643 except (ValueError, IndexError):
644 raise HTTPBadRequest("Bad request version (%r)" % version)
646 if version_number >= (2, 0):
647 raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
650 elif len(words) == 2:
652 [method, path] = words
653 if method != HTTP_GET:
654 raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
657 raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
659 # Examine the headers and look for a Connection directive
660 headers = mimetools.Message(self.rfile, 0)
662 self.request_method = method
663 self.request_path = path
664 self.request_version = version
665 self.request_headers = headers
667 def _ReadPostData(self):
668 """Reads POST/PUT data
670 Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
671 a request is signaled by the inclusion of a Content-Length header field in
672 the request message headers. HTTP/1.0 requests containing an entity body
673 must include a valid Content-Length header field."
676 # While not according to specification, we only support an entity body for
678 if (not self.request_method or
679 self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
680 self.request_post_data = None
683 content_length = None
685 if HTTP_CONTENT_LENGTH in self.request_headers:
686 content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
692 # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
693 if content_length is None:
694 raise HTTPLengthRequired("Missing Content-Length header or"
697 data = self.rfile.read(content_length)
699 # TODO: Content-type, error handling
701 self.request_post_data = HTTPJsonConverter().Decode(data)
703 self.request_post_data = None
705 logging.debug("HTTP POST data: %s", self.request_post_data)
708 class HttpServer(_HttpSocketBase):
709 """Generic HTTP server class
711 Users of this class must subclass it and override the HandleRequest function.
716 def __init__(self, mainloop, local_address, port,
717 ssl_params=None, ssl_verify_peer=False):
718 """Initializes the HTTP server
720 @type mainloop: ganeti.daemon.Mainloop
721 @param mainloop: Mainloop used to poll for I/O events
722 @type local_addess: string
723 @param local_address: Local IP address to bind to
725 @param port: TCP port to listen on
726 @type ssl_params: HttpSslParams
727 @param ssl_params: SSL key and certificate
728 @type ssl_verify_peer: bool
729 @param ssl_verify_peer: Whether to require client certificate and compare
730 it with our certificate
733 _HttpSocketBase.__init__(self)
735 self.mainloop = mainloop
736 self.local_address = local_address
739 self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
741 # Allow port to be reused
742 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
745 self._fileio_class = _SSLFileObject
747 self._fileio_class = socket._fileobject
751 mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
752 mainloop.RegisterSignal(self)
755 self.socket.bind((self.local_address, self.port))
756 self.socket.listen(5)
761 def OnIO(self, fd, condition):
762 if condition & select.POLLIN:
763 self._IncomingConnection()
765 def OnSignal(self, signum):
766 if signum == signal.SIGCHLD:
767 self._CollectChildren(True)
769 def _CollectChildren(self, quick):
770 """Checks whether any child processes are done
773 @param quick: Whether to only use non-blocking functions
777 # Don't wait for other processes if it should be a quick check
778 while len(self._children) > self.MAX_CHILDREN:
780 # Waiting without a timeout brings us into a potential DoS situation.
781 # As soon as too many children run, we'll not respond to new
782 # requests. The real solution would be to add a timeout for children
783 # and killing them after some time.
784 pid, status = os.waitpid(0, 0)
787 if pid and pid in self._children:
788 self._children.remove(pid)
790 for child in self._children:
792 pid, status = os.waitpid(child, os.WNOHANG)
795 if pid and pid in self._children:
796 self._children.remove(pid)
798 def _IncomingConnection(self):
799 """Called for each incoming connection
802 (connection, client_addr) = self.socket.accept()
804 self._CollectChildren(False)
810 HttpServerRequestExecutor(self, connection, client_addr,
813 logging.exception("Error while handling request from %s:%s",
814 client_addr[0], client_addr[1])
818 self._children.append(pid)
820 def HandleRequest(self, req):
821 raise NotImplementedError()
824 class HttpClientRequest(object):
825 def __init__(self, host, port, method, path, headers=None, post_data=None,
826 ssl_params=None, ssl_verify_peer=False):
827 """Describes an HTTP request.
830 @param host: Hostname
834 @param method: Method name
836 @param path: Request path
837 @type headers: dict or None
838 @param headers: Additional headers to send
839 @type post_data: string or None
840 @param post_data: Additional data to send
841 @type ssl_params: HttpSslParams
842 @param ssl_params: SSL key and certificate
843 @type ssl_verify_peer: bool
844 @param ssl_verify_peer: Whether to compare our certificate with server's
848 if post_data is not None:
849 assert method.upper() in (HTTP_POST, HTTP_PUT), \
850 "Only POST and GET requests support sending data"
852 assert path.startswith("/"), "Path must start with slash (/)"
856 self.ssl_params = ssl_params
857 self.ssl_verify_peer = ssl_verify_peer
860 self.headers = headers
861 self.post_data = post_data
866 self.resp_status_line = None
867 self.resp_version = None
868 self.resp_status = None
869 self.resp_reason = None
870 self.resp_headers = None
871 self.resp_body = None
874 class HttpClientRequestExecutor(_HttpSocketBase):
877 HTTP_USER_AGENT: HTTP_GANETI_VERSION,
878 # TODO: For keep-alive, don't send "Connection: close"
879 HTTP_CONNECTION: "close",
883 STATUS_LINE_LENGTH_MAX = 512
884 HEADER_LENGTH_MAX = 4 * 1024
886 # Timeouts in seconds for socket layer
887 # TODO: Make read timeout configurable per OpCode
888 CONNECT_TIMEOUT = 5.0
893 # Parser state machine
894 PS_STATUS_LINE = "status-line"
895 PS_HEADERS = "headers"
897 PS_COMPLETE = "complete"
899 def __init__(self, req):
900 """Initializes the HttpClientRequestExecutor class.
902 @type req: HttpClientRequest
903 @param req: Request object
906 _HttpSocketBase.__init__(self)
910 self.parser_status = self.PS_STATUS_LINE
911 self.header_buffer = StringIO()
912 self.body_buffer = StringIO()
913 self.content_length = None
914 self.server_will_close = None
916 self.poller = select.poll()
919 # TODO: Implement connection caching/keep-alive
920 self.sock = self._CreateSocket(req.ssl_params,
923 # Disable Python's timeout
924 self.sock.settimeout(None)
926 # Operate in non-blocking mode
927 self.sock.setblocking(0)
935 # Only wait for server to close if we didn't have any exception.
938 self._CloseConnection(force_close)
943 req.resp_body = self.body_buffer.getvalue()
948 except _HttpClientError, err:
952 def _BuildRequest(self):
953 """Build HTTP request.
956 @return: Complete request
960 send_headers = self.DEFAULT_HEADERS.copy()
962 if self.request.headers:
963 send_headers.update(self.request.headers)
965 send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
967 if self.request.post_data:
968 send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
972 # Add request line. We only support HTTP/1.0 (no chunked transfers and no
974 # TODO: For keep-alive, change to HTTP/1.1
975 buf.write("%s %s %s\r\n" % (self.request.method.upper(),
976 self.request.path, HTTP_1_0))
979 for name, value in send_headers.iteritems():
980 buf.write("%s: %s\r\n" % (name, value))
984 if self.request.post_data:
985 buf.write(self.request.post_data)
987 return buf.getvalue()
989 def _ParseStatusLine(self):
990 """Parses the status line sent by the server.
993 line = self.request.resp_status_line
996 raise _HttpClientError("Empty status line")
999 [version, status, reason] = line.split(None, 2)
1002 [version, status] = line.split(None, 1)
1008 version = version.upper()
1010 if version not in (HTTP_1_0, HTTP_1_1):
1011 # We do not support HTTP/0.9, despite the specification requiring it
1012 # (RFC2616, section 19.6)
1013 raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1016 # The status code is a three-digit number
1018 status = int(status)
1019 if status < 100 or status > 999:
1025 raise _HttpClientError("Invalid status code (%r)" % line)
1027 self.request.resp_version = version
1028 self.request.resp_status = status
1029 self.request.resp_reason = reason
1031 def _WillServerCloseConnection(self):
1032 """Evaluate whether server will close the connection.
1035 @return: Whether server will close the connection
1038 hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1040 hdr_connection = hdr_connection.lower()
1042 # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1043 if self.request.resp_version == HTTP_1_1:
1044 return (hdr_connection and "close" in hdr_connection)
1046 # Some HTTP/1.0 implementations have support for persistent connections,
1047 # using rules different than HTTP/1.1.
1049 # For older HTTP, Keep-Alive indicates persistent connection.
1050 if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1053 # At least Akamai returns a "Connection: Keep-Alive" header, which was
1054 # supposed to be sent by the client.
1055 if hdr_connection and "keep-alive" in hdr_connection:
1060 def _ParseHeaders(self):
1061 """Parses the headers sent by the server.
1063 This function also adjusts internal variables based on the header values.
1069 self.header_buffer.seek(0, 0)
1070 req.resp_headers = mimetools.Message(self.header_buffer, 0)
1072 self.server_will_close = self._WillServerCloseConnection()
1074 # Do we have a Content-Length header?
1075 hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1076 if hdr_content_length:
1078 self.content_length = int(hdr_content_length)
1081 if self.content_length is not None and self.content_length < 0:
1082 self.content_length = None
1084 # does the body have a fixed length? (of zero)
1085 if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1086 100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1087 self.content_length = 0
1089 # if the connection remains open and a content-length was not provided,
1090 # then assume that the connection WILL close.
1091 if self.content_length is None:
1092 self.server_will_close = True
1094 def _CheckStatusLineLength(self, length):
1095 if length > self.STATUS_LINE_LENGTH_MAX:
1096 raise _HttpClientError("Status line longer than %d chars" %
1097 self.STATUS_LINE_LENGTH_MAX)
1099 def _CheckHeaderLength(self, length):
1100 if length > self.HEADER_LENGTH_MAX:
1101 raise _HttpClientError("Headers longer than %d chars" %
1102 self.HEADER_LENGTH_MAX)
1104 def _ParseBuffer(self, buf, eof):
1105 """Main function for HTTP response state machine.
1108 @param buf: Receive buffer
1110 @param eof: Whether we've reached EOF on the socket
1112 @return: Updated receive buffer
1115 if self.parser_status == self.PS_STATUS_LINE:
1116 # Expect status line
1117 idx = buf.find("\r\n")
1119 self.request.resp_status_line = buf[:idx]
1121 self._CheckStatusLineLength(len(self.request.resp_status_line))
1123 # Remove status line, including CRLF
1126 self._ParseStatusLine()
1128 self.parser_status = self.PS_HEADERS
1130 # Check whether incoming data is getting too large, otherwise we just
1131 # fill our read buffer.
1132 self._CheckStatusLineLength(len(buf))
1134 if self.parser_status == self.PS_HEADERS:
1135 # Wait for header end
1136 idx = buf.find("\r\n\r\n")
1138 self.header_buffer.write(buf[:idx + 2])
1140 self._CheckHeaderLength(self.header_buffer.tell())
1142 # Remove headers, including CRLF
1145 self._ParseHeaders()
1147 self.parser_status = self.PS_BODY
1149 # Check whether incoming data is getting too large, otherwise we just
1150 # fill our read buffer.
1151 self._CheckHeaderLength(len(buf))
1153 if self.parser_status == self.PS_BODY:
1154 self.body_buffer.write(buf)
1157 # Check whether we've read everything
1159 (self.content_length is not None and
1160 self.body_buffer.tell() >= self.content_length)):
1161 self.parser_status = self.PS_COMPLETE
1166 """Non-blocking connect to host with timeout.
1172 connect_error = self.sock.connect_ex((self.request.host,
1174 except socket.gaierror, err:
1175 raise _HttpClientError("Connection failed: %s" % str(err))
1177 if connect_error == errno.EINTR:
1181 elif connect_error == 0:
1182 # Connection established
1186 elif connect_error == errno.EINPROGRESS:
1187 # Connection started
1190 raise _HttpClientError("Connection failed (%s: %s)" %
1191 (connect_error, os.strerror(connect_error)))
1194 # Wait for connection
1195 event = WaitForSocketCondition(self.poller, self.sock,
1196 select.POLLOUT, self.CONNECT_TIMEOUT)
1198 raise _HttpClientError("Timeout while connecting to server")
1201 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1202 if connect_error != 0:
1203 raise _HttpClientError("Connection failed (%s: %s)" %
1204 (connect_error, os.strerror(connect_error)))
1206 # Enable TCP keep-alive
1207 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1209 # If needed, Linux specific options are available to change the TCP
1210 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1213 def _SendRequest(self):
1214 """Sends request to server.
1217 buf = self._BuildRequest()
1220 # Send only 4 KB at a time
1224 sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1226 except _HttpSocketTimeout:
1227 raise _HttpClientError("Timeout while sending request")
1228 except socket.error, err:
1229 raise _HttpClientError("Error sending request: %s" % err)
1234 assert not buf, "Request wasn't sent completely"
1236 def _ReadResponse(self):
1237 """Read response from server.
1239 Calls the parser function after reading a chunk of data.
1244 while self.parser_status != self.PS_COMPLETE:
1246 data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1248 except _HttpSocketTimeout:
1249 raise _HttpClientError("Timeout while reading response")
1250 except socket.error, err:
1251 raise _HttpClientError("Error while reading response: %s" % err)
1258 # Do some parsing and error checking while more data arrives
1259 buf = self._ParseBuffer(buf, eof)
1261 # Must be done only after the buffer has been evaluated
1263 self.parser_status in (self.PS_STATUS_LINE,
1265 raise _HttpClientError("Connection closed prematurely")
1268 buf = self._ParseBuffer(buf, True)
1270 assert self.parser_status == self.PS_COMPLETE
1271 assert not buf, "Parser didn't read full response"
1273 def _CloseConnection(self, force):
1274 """Closes the connection.
1277 if self.server_will_close and not force:
1278 # Wait for server to close
1280 # Check whether it's actually closed
1281 if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1282 self.CLOSE_TIMEOUT):
1284 except (socket.error, _HttpClientError, _HttpSocketTimeout):
1285 # Ignore errors at this stage
1288 # Close the connection from our side
1290 SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1291 socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1292 except _HttpSocketTimeout:
1293 raise _HttpClientError("Timeout while shutting down connection")
1294 except socket.error, err:
1295 raise _HttpClientError("Error while shutting down connection: %s" % err)
1298 class _HttpClientPendingRequest(object):
1299 """Data class for pending requests.
1302 def __init__(self, request):
1303 self.request = request
1305 # Thread synchronization
1306 self.done = threading.Event()
1309 class HttpClientWorker(workerpool.BaseWorker):
1310 """HTTP client worker class.
1313 def RunTask(self, pend_req):
1315 HttpClientRequestExecutor(pend_req.request)
1320 class HttpClientWorkerPool(workerpool.WorkerPool):
1321 def __init__(self, manager):
1322 workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1324 self.manager = manager
1327 class HttpClientManager(object):
1328 """Manages HTTP requests.
1332 self._wpool = HttpClientWorkerPool(self)
1337 def ExecRequests(self, requests):
1338 """Execute HTTP requests.
1340 This function can be called from multiple threads at the same time.
1342 @type requests: List of HttpClientRequest instances
1343 @param requests: The requests to execute
1344 @rtype: List of HttpClientRequest instances
1345 @returns: The list of requests passed in
1348 # _HttpClientPendingRequest is used for internal thread synchronization
1349 pending = [_HttpClientPendingRequest(req) for req in requests]
1352 # Add requests to queue
1353 for pend_req in pending:
1354 self._wpool.AddTask(pend_req)
1357 # In case of an exception we should still wait for the rest, otherwise
1358 # another thread from the worker pool could modify the request object
1359 # after we returned.
1361 # And wait for them to finish
1362 for pend_req in pending:
1363 pend_req.done.wait()
1365 # Return original list
1369 self._wpool.Quiesce()
1370 self._wpool.TerminateWorkers()
1373 class _SSLFileObject(object):
1374 """Wrapper around socket._fileobject
1376 This wrapper is required to handle OpenSSL exceptions.
1379 def _RequireOpenSocket(fn):
1380 def wrapper(self, *args, **kwargs):
1382 raise SocketClosed("Socket is closed")
1383 return fn(self, *args, **kwargs)
1386 def __init__(self, sock, mode='rb', bufsize=-1):
1387 self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1389 def _ConnectionLost(self):
1392 def _getclosed(self):
1393 return self._base is None or self._base.closed
1394 closed = property(_getclosed, doc="True if the file is closed")
1398 return self._base.close()
1402 return self._base.flush()
1406 return self._base.fileno()
1409 def read(self, size=-1):
1410 return self._ReadWrapper(self._base.read, size=size)
1413 def readline(self, size=-1):
1414 return self._ReadWrapper(self._base.readline, size=size)
1416 def _ReadWrapper(self, fn, *args, **kwargs):
1419 return fn(*args, **kwargs)
1421 except OpenSSL.SSL.ZeroReturnError, err:
1422 self._ConnectionLost()
1425 except OpenSSL.SSL.WantReadError:
1428 #except OpenSSL.SSL.WantWriteError:
1431 except OpenSSL.SSL.SysCallError, (retval, desc):
1432 if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1434 self._ConnectionLost()
1437 logging.exception("Error in OpenSSL")
1438 self._ConnectionLost()
1439 raise socket.error(err.args)
1441 except OpenSSL.SSL.Error, err:
1442 self._ConnectionLost()
1443 raise socket.error(err.args)
1446 def write(self, data):
1447 return self._WriteWrapper(self._base.write, data)
1449 def _WriteWrapper(self, fn, *args, **kwargs):
1452 return fn(*args, **kwargs)
1453 except OpenSSL.SSL.ZeroReturnError, err:
1454 self._ConnectionLost()
1457 except OpenSSL.SSL.WantWriteError:
1460 #except OpenSSL.SSL.WantReadError:
1463 except OpenSSL.SSL.SysCallError, err:
1464 if err.args[0] == -1 and data == "":
1465 # errors when writing empty strings are expected
1466 # and can be ignored
1469 self._ConnectionLost()
1470 raise socket.error(err.args)
1472 except OpenSSL.SSL.Error, err:
1473 self._ConnectionLost()
1474 raise socket.error(err.args)