3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 # General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 """HTTP server module.
37 from cStringIO import StringIO
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import workerpool
42 from ganeti import utils
45 HTTP_CLIENT_THREADS = 10
47 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
49 WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
51 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
54 # Default error message
55 DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56 DEFAULT_ERROR_MESSAGE = """\
58 <title>Error response</title>
61 <h1>Error response</h1>
62 <p>Error code %(code)d.
63 <p>Message: %(message)s.
64 <p>Error code explanation: %(code)s = %(explain)s.
70 HTTP_NOT_MODIFIED = 304
83 HTTP_SERVER = "Server"
85 HTTP_USER_AGENT = "User-Agent"
86 HTTP_CONTENT_TYPE = "Content-Type"
87 HTTP_CONTENT_LENGTH = "Content-Length"
88 HTTP_CONNECTION = "Connection"
89 HTTP_KEEP_ALIVE = "Keep-Alive"
91 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
94 class SocketClosed(socket.error):
98 class _HttpClientError(Exception):
99 """Internal exception for HTTP client errors.
101 This should only be used for internal error reporting.
107 class HTTPException(Exception):
111 def __init__(self, message=None):
112 Exception.__init__(self)
113 if message is not None:
114 self.message = message
117 class HTTPBadRequest(HTTPException):
121 class HTTPForbidden(HTTPException):
125 class HTTPNotFound(HTTPException):
129 class HTTPGone(HTTPException):
133 class HTTPLengthRequired(HTTPException):
137 class HTTPInternalError(HTTPException):
141 class HTTPNotImplemented(HTTPException):
145 class HTTPServiceUnavailable(HTTPException):
149 class HTTPVersionNotSupported(HTTPException):
154 """Utility class to write HTTP server log files.
156 The written format is the "Common Log Format" as defined by Apache:
157 http://httpd.apache.org/docs/2.2/mod/mod_log_config.html#examples
160 def __init__(self, fd):
161 """Constructor for ApacheLogfile class.
164 - fd: Open file object
169 def LogRequest(self, request, format, *args):
170 self._fd.write("%s %s %s [%s] %s\n" % (
171 # Remote host address
172 request.address_string(),
174 # RFC1413 identity (identd)
181 self._FormatCurrentTime(),
188 def _FormatCurrentTime(self):
189 """Formats current time in Common Log Format.
192 return self._FormatLogTime(time.time())
194 def _FormatLogTime(self, seconds):
195 """Formats time for Common Log Format.
197 All timestamps are logged in the UTC timezone.
200 - seconds: Time in seconds since the epoch
203 (_, month, _, _, _, _, _, _, _) = tm = time.gmtime(seconds)
204 format = "%d/" + MONTHNAME[month] + "/%Y:%H:%M:%S +0000"
205 return time.strftime(format, tm)
208 class HTTPJsonConverter:
209 CONTENT_TYPE = "application/json"
211 def Encode(self, data):
212 return serializer.DumpJson(data)
214 def Decode(self, data):
215 return serializer.LoadJson(data)
218 class _HttpSocketBase(object):
219 """Base class for HTTP server and client.
223 self._using_ssl = None
224 self._ssl_cert = None
227 def _CreateSocket(self, ssl_key_path, ssl_cert_path, ssl_verify_peer):
228 """Creates a TCP socket and initializes SSL if needed.
230 @type ssl_key_path: string
231 @param ssl_key_path: Path to file containing SSL key in PEM format
232 @type ssl_cert_path: string
233 @param ssl_cert_path: Path to file containing SSL certificate in PEM format
234 @type ssl_verify_peer: bool
235 @param ssl_verify_peer: Whether to require client certificate and compare
236 it with our certificate
239 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
241 # Should we enable SSL?
242 self._using_ssl = (ssl_cert_path and ssl_key_path)
244 if not self._using_ssl:
247 ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
248 ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
250 ssl_key_pem = utils.ReadFile(ssl_key_path)
251 ssl_cert_pem = utils.ReadFile(ssl_cert_path)
254 self._ssl_cert = cr.load_certificate(cr.FILETYPE_PEM, ssl_cert_pem)
255 self._ssl_key = cr.load_privatekey(cr.FILETYPE_PEM, ssl_key_pem)
258 ctx.use_privatekey(self._ssl_key)
259 ctx.use_certificate(self._ssl_cert)
260 ctx.check_privatekey()
263 ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
264 OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
265 self._SSLVerifyCallback)
267 return OpenSSL.SSL.Connection(ctx, sock)
269 def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
270 """Verify the certificate provided by the peer
272 We only compare fingerprints. The client must use the same certificate as
276 assert self._ssl_cert and self._ssl_key, "SSL not initialized"
278 return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
279 self._ssl_cert.digest("md5") == cert.digest("md5"))
282 class _HttpConnectionHandler(object):
283 """Implements server side of HTTP
285 This class implements the server side of HTTP. It's based on code of Python's
286 BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
287 character encodings. Keep-alive connections are not supported.
290 # The default request version. This only affects responses up until
291 # the point where the request line is parsed, so it mainly decides what
292 # the client gets back when sending a malformed request line.
293 # Most web servers default to HTTP 0.9, i.e. don't send a status line.
294 default_request_version = HTTP_0_9
296 # Error message settings
297 error_message_format = DEFAULT_ERROR_MESSAGE
298 error_content_type = DEFAULT_ERROR_CONTENT_TYPE
300 responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
302 def __init__(self, server, conn, client_addr, fileio_class):
303 """Initializes this class.
305 Part of the initialization is reading the request and eventual POST/PUT
306 data sent by the client.
309 self._server = server
311 # We default rfile to buffered because otherwise it could be
312 # really slow for large data (a getc() call per byte); we make
313 # wfile unbuffered because (a) often after a write() we want to
314 # read and we need to flush the line; (b) big writes to unbuffered
315 # files are typically optimized by stdio even when big reads
317 self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
318 self.wfile = fileio_class(conn, mode="wb", bufsize=0)
320 self.client_addr = client_addr
322 self.request_headers = None
323 self.request_method = None
324 self.request_path = None
325 self.request_requestline = None
326 self.request_version = self.default_request_version
328 self.response_body = None
329 self.response_code = HTTP_OK
330 self.response_content_type = None
331 self.response_headers = {}
333 self.should_fork = False
338 except HTTPException, err:
339 self._SetErrorStatus(err)
342 if not self.wfile.closed:
347 def _DateTimeHeader(self):
348 """Return the current date and time formatted for a message header.
351 (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
352 return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
353 (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
355 def _SetErrorStatus(self, err):
356 """Sets the response code and body from a HTTPException.
358 @type err: HTTPException
359 @param err: Exception instance
363 (shortmsg, longmsg) = self.responses[err.code]
365 shortmsg = longmsg = "Unknown"
368 message = err.message
374 "message": cgi.escape(message),
378 self.response_code = err.code
379 self.response_content_type = self.error_content_type
380 self.response_body = self.error_message_format % values
382 def HandleRequest(self):
383 """Handle the actual request.
385 Calls the actual handler function and converts exceptions into HTTP errors.
388 # Don't do anything if there's already been a problem
389 if self.response_code != HTTP_OK:
392 assert self.request_method, "Status code %s requires a method" % HTTP_OK
394 # Check whether client is still there
399 result = self._server.HandleRequest(self)
402 encoder = HTTPJsonConverter()
403 body = encoder.Encode(result)
405 self.response_content_type = encoder.CONTENT_TYPE
406 self.response_body = body
407 except (HTTPException, KeyboardInterrupt, SystemExit):
409 except Exception, err:
410 logging.exception("Caught exception")
411 raise HTTPInternalError(message=str(err))
413 logging.exception("Unknown exception")
414 raise HTTPInternalError(message="Unknown error")
416 except HTTPException, err:
417 self._SetErrorStatus(err)
419 def SendResponse(self):
420 """Sends response to the client.
423 # Check whether client is still there
426 logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
427 self.request_requestline, self.response_code)
429 if self.response_code in self.responses:
430 response_message = self.responses[self.response_code][0]
432 response_message = ""
434 if self.request_version != HTTP_0_9:
435 self.wfile.write("%s %d %s\r\n" %
436 (self.request_version, self.response_code,
438 self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
439 self._SendHeader(HTTP_DATE, self._DateTimeHeader())
440 self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
441 self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
442 for key, val in self.response_headers.iteritems():
443 self._SendHeader(key, val)
445 # We don't support keep-alive at this time
446 self._SendHeader(HTTP_CONNECTION, "close")
447 self.wfile.write("\r\n")
449 if (self.request_method != HTTP_HEAD and
450 self.response_code >= HTTP_OK and
451 self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
452 self.wfile.write(self.response_body)
454 def _SendHeader(self, name, value):
455 if self.request_version != HTTP_0_9:
456 self.wfile.write("%s: %s\r\n" % (name, value))
458 def _ReadRequest(self):
459 """Reads and parses request line
462 raw_requestline = self.rfile.readline()
464 requestline = raw_requestline
465 if requestline[-2:] == '\r\n':
466 requestline = requestline[:-2]
467 elif requestline[-1:] == '\n':
468 requestline = requestline[:-1]
471 raise HTTPBadRequest("Empty request line")
473 self.request_requestline = requestline
475 logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
477 words = requestline.split()
480 [method, path, version] = words
481 if version[:5] != 'HTTP/':
482 raise HTTPBadRequest("Bad request version (%r)" % version)
485 base_version_number = version.split('/', 1)[1]
486 version_number = base_version_number.split(".")
488 # RFC 2145 section 3.1 says there can be only one "." and
489 # - major and minor numbers MUST be treated as
491 # - HTTP/2.4 is a lower version than HTTP/2.13, which in
492 # turn is lower than HTTP/12.3;
493 # - Leading zeros MUST be ignored by recipients.
494 if len(version_number) != 2:
495 raise HTTPBadRequest("Bad request version (%r)" % version)
497 version_number = int(version_number[0]), int(version_number[1])
498 except (ValueError, IndexError):
499 raise HTTPBadRequest("Bad request version (%r)" % version)
501 if version_number >= (2, 0):
502 raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
505 elif len(words) == 2:
507 [method, path] = words
508 if method != HTTP_GET:
509 raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
512 raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
514 # Examine the headers and look for a Connection directive
515 headers = mimetools.Message(self.rfile, 0)
517 self.request_method = method
518 self.request_path = path
519 self.request_version = version
520 self.request_headers = headers
522 def _ReadPostData(self):
523 """Reads POST/PUT data
525 Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
526 a request is signaled by the inclusion of a Content-Length header field in
527 the request message headers. HTTP/1.0 requests containing an entity body
528 must include a valid Content-Length header field."
531 # While not according to specification, we only support an entity body for
533 if (not self.request_method or
534 self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
535 self.request_post_data = None
538 content_length = None
540 if HTTP_CONTENT_LENGTH in self.request_headers:
541 content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
547 # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
548 if content_length is None:
549 raise HTTPLengthRequired("Missing Content-Length header or"
552 data = self.rfile.read(content_length)
554 # TODO: Content-type, error handling
556 self.request_post_data = HTTPJsonConverter().Decode(data)
558 self.request_post_data = None
560 logging.debug("HTTP POST data: %s", self.request_post_data)
563 class HttpServer(_HttpSocketBase):
564 """Generic HTTP server class
566 Users of this class must subclass it and override the HandleRequest function.
571 def __init__(self, mainloop, local_address, port,
572 ssl_key_path=None, ssl_cert_path=None, ssl_verify_peer=False):
573 """Initializes the HTTP server
575 @type mainloop: ganeti.daemon.Mainloop
576 @param mainloop: Mainloop used to poll for I/O events
577 @type local_addess: string
578 @param local_address: Local IP address to bind to
580 @param port: TCP port to listen on
581 @type ssl_key_path: string
582 @param ssl_key_path: Path to file containing SSL key in PEM format
583 @type ssl_cert_path: string
584 @param ssl_cert_path: Path to file containing SSL certificate in PEM format
585 @type ssl_verify_peer: bool
586 @param ssl_verify_peer: Whether to require client certificate and compare
587 it with our certificate
590 _HttpSocketBase.__init__(self)
592 self.mainloop = mainloop
593 self.local_address = local_address
596 self.socket = self._CreateSocket(ssl_key_path, ssl_cert_path, ssl_verify_peer)
598 # Allow port to be reused
599 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
602 self._fileio_class = _SSLFileObject
604 self._fileio_class = socket._fileobject
608 mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
609 mainloop.RegisterSignal(self)
612 self.socket.bind((self.local_address, self.port))
613 self.socket.listen(5)
618 def OnIO(self, fd, condition):
619 if condition & select.POLLIN:
620 self._IncomingConnection()
622 def OnSignal(self, signum):
623 if signum == signal.SIGCHLD:
624 self._CollectChildren(True)
626 def _CollectChildren(self, quick):
627 """Checks whether any child processes are done
630 @param quick: Whether to only use non-blocking functions
634 # Don't wait for other processes if it should be a quick check
635 while len(self._children) > self.MAX_CHILDREN:
637 # Waiting without a timeout brings us into a potential DoS situation.
638 # As soon as too many children run, we'll not respond to new
639 # requests. The real solution would be to add a timeout for children
640 # and killing them after some time.
641 pid, status = os.waitpid(0, 0)
644 if pid and pid in self._children:
645 self._children.remove(pid)
647 for child in self._children:
649 pid, status = os.waitpid(child, os.WNOHANG)
652 if pid and pid in self._children:
653 self._children.remove(pid)
655 def _IncomingConnection(self):
656 """Called for each incoming connection
659 (connection, client_addr) = self.socket.accept()
661 self._CollectChildren(False)
666 logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
673 # Read, parse and handle request
674 handler = _HttpConnectionHandler(self, connection, client_addr,
676 handler.HandleRequest()
678 # Try to send a response
680 handler.SendResponse()
685 logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
687 logging.exception("Error while handling request from %s:%s",
688 client_addr[0], client_addr[1])
692 self._children.append(pid)
694 def HandleRequest(self, req):
695 raise NotImplementedError()
698 class HttpClientRequest(object):
699 def __init__(self, host, port, method, path, headers=None, post_data=None,
700 ssl_key_path=None, ssl_cert_path=None, ssl_verify_peer=False):
701 """Describes an HTTP request.
704 @param host: Hostname
708 @param method: Method name
710 @param path: Request path
711 @type headers: dict or None
712 @param headers: Additional headers to send
713 @type post_data: string or None
714 @param post_data: Additional data to send
717 if post_data is not None:
718 assert method.upper() in (HTTP_POST, HTTP_PUT), \
719 "Only POST and GET requests support sending data"
721 assert path.startswith("/"), "Path must start with slash (/)"
725 self.ssl_key_path = ssl_key_path
726 self.ssl_cert_path = ssl_cert_path
727 self.ssl_verify_peer = ssl_verify_peer
730 self.headers = headers
731 self.post_data = post_data
736 self.resp_status_line = None
737 self.resp_version = None
738 self.resp_status = None
739 self.resp_reason = None
740 self.resp_headers = None
741 self.resp_body = None
744 class HttpClientRequestExecutor(_HttpSocketBase):
747 HTTP_USER_AGENT: HTTP_GANETI_VERSION,
748 # TODO: For keep-alive, don't send "Connection: close"
749 HTTP_CONNECTION: "close",
753 STATUS_LINE_LENGTH_MAX = 512
754 HEADER_LENGTH_MAX = 4 * 1024
756 # Timeouts in seconds for socket layer
757 # TODO: Make read timeout configurable per OpCode
758 CONNECT_TIMEOUT = 5.0
763 # Parser state machine
764 PS_STATUS_LINE = "status-line"
765 PS_HEADERS = "headers"
767 PS_COMPLETE = "complete"
773 OP_SHUTDOWN) = range(4)
775 def __init__(self, req):
776 """Initializes the HttpClientRequestExecutor class.
778 @type req: HttpClientRequest
779 @param req: Request object
782 _HttpSocketBase.__init__(self)
786 self.parser_status = self.PS_STATUS_LINE
787 self.header_buffer = StringIO()
788 self.body_buffer = StringIO()
789 self.content_length = None
790 self.server_will_close = None
792 self.poller = select.poll()
795 # TODO: Implement connection caching/keep-alive
796 self.sock = self._CreateSocket(req.ssl_key_path,
800 # Disable Python's timeout
801 self.sock.settimeout(None)
803 # Operate in non-blocking mode
804 self.sock.setblocking(0)
812 # Only wait for server to close if we didn't have any exception.
815 self._CloseConnection(force_close)
820 req.resp_body = self.body_buffer.getvalue()
825 except _HttpClientError, err:
829 def _BuildRequest(self):
830 """Build HTTP request.
833 @return: Complete request
837 send_headers = self.DEFAULT_HEADERS.copy()
839 if self.request.headers:
840 send_headers.update(self.request.headers)
842 send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
844 if self.request.post_data:
845 send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
849 # Add request line. We only support HTTP/1.0 (no chunked transfers and no
851 # TODO: For keep-alive, change to HTTP/1.1
852 buf.write("%s %s %s\r\n" % (self.request.method.upper(),
853 self.request.path, HTTP_1_0))
856 for name, value in send_headers.iteritems():
857 buf.write("%s: %s\r\n" % (name, value))
861 if self.request.post_data:
862 buf.write(self.request.post_data)
864 return buf.getvalue()
866 def _ParseStatusLine(self):
867 """Parses the status line sent by the server.
870 line = self.request.resp_status_line
873 raise _HttpClientError("Empty status line")
876 [version, status, reason] = line.split(None, 2)
879 [version, status] = line.split(None, 1)
885 version = version.upper()
887 if version not in (HTTP_1_0, HTTP_1_1):
888 # We do not support HTTP/0.9, despite the specification requiring it
889 # (RFC2616, section 19.6)
890 raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
893 # The status code is a three-digit number
896 if status < 100 or status > 999:
902 raise _HttpClientError("Invalid status code (%r)" % line)
904 self.request.resp_version = version
905 self.request.resp_status = status
906 self.request.resp_reason = reason
908 def _WillServerCloseConnection(self):
909 """Evaluate whether server will close the connection.
912 @return: Whether server will close the connection
915 hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
917 hdr_connection = hdr_connection.lower()
919 # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
920 if self.request.resp_version == HTTP_1_1:
921 return (hdr_connection and "close" in hdr_connection)
923 # Some HTTP/1.0 implementations have support for persistent connections,
924 # using rules different than HTTP/1.1.
926 # For older HTTP, Keep-Alive indicates persistent connection.
927 if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
930 # At least Akamai returns a "Connection: Keep-Alive" header, which was
931 # supposed to be sent by the client.
932 if hdr_connection and "keep-alive" in hdr_connection:
937 def _ParseHeaders(self):
938 """Parses the headers sent by the server.
940 This function also adjusts internal variables based on the header values.
946 self.header_buffer.seek(0, 0)
947 req.resp_headers = mimetools.Message(self.header_buffer, 0)
949 self.server_will_close = self._WillServerCloseConnection()
951 # Do we have a Content-Length header?
952 hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
953 if hdr_content_length:
955 self.content_length = int(hdr_content_length)
958 if self.content_length is not None and self.content_length < 0:
959 self.content_length = None
961 # does the body have a fixed length? (of zero)
962 if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
963 100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
964 self.content_length = 0
966 # if the connection remains open and a content-length was not provided,
967 # then assume that the connection WILL close.
968 if self.content_length is None:
969 self.server_will_close = True
971 def _CheckStatusLineLength(self, length):
972 if length > self.STATUS_LINE_LENGTH_MAX:
973 raise _HttpClientError("Status line longer than %d chars" %
974 self.STATUS_LINE_LENGTH_MAX)
976 def _CheckHeaderLength(self, length):
977 if length > self.HEADER_LENGTH_MAX:
978 raise _HttpClientError("Headers longer than %d chars" %
979 self.HEADER_LENGTH_MAX)
981 def _ParseBuffer(self, buf, eof):
982 """Main function for HTTP response state machine.
985 @param buf: Receive buffer
987 @param eof: Whether we've reached EOF on the socket
989 @return: Updated receive buffer
992 if self.parser_status == self.PS_STATUS_LINE:
994 idx = buf.find("\r\n")
996 self.request.resp_status_line = buf[:idx]
998 self._CheckStatusLineLength(len(self.request.resp_status_line))
1000 # Remove status line, including CRLF
1003 self._ParseStatusLine()
1005 self.parser_status = self.PS_HEADERS
1007 # Check whether incoming data is getting too large, otherwise we just
1008 # fill our read buffer.
1009 self._CheckStatusLineLength(len(buf))
1011 if self.parser_status == self.PS_HEADERS:
1012 # Wait for header end
1013 idx = buf.find("\r\n\r\n")
1015 self.header_buffer.write(buf[:idx + 2])
1017 self._CheckHeaderLength(self.header_buffer.tell())
1019 # Remove headers, including CRLF
1022 self._ParseHeaders()
1024 self.parser_status = self.PS_BODY
1026 # Check whether incoming data is getting too large, otherwise we just
1027 # fill our read buffer.
1028 self._CheckHeaderLength(len(buf))
1030 if self.parser_status == self.PS_BODY:
1031 self.body_buffer.write(buf)
1034 # Check whether we've read everything
1036 (self.content_length is not None and
1037 self.body_buffer.tell() >= self.content_length)):
1038 self.parser_status = self.PS_COMPLETE
1042 def _WaitForCondition(self, event, timeout):
1043 """Waits for a condition to occur on the socket.
1046 @param event: ORed condition (see select module)
1047 @type timeout: float or None
1048 @param timeout: Timeout in seconds
1050 @return: None for timeout, otherwise occured conditions
1053 check = (event | select.POLLPRI |
1054 select.POLLNVAL | select.POLLHUP | select.POLLERR)
1056 if timeout is not None:
1057 # Poller object expects milliseconds
1060 self.poller.register(self.sock, event)
1063 # TODO: If the main thread receives a signal and we have no timeout, we
1064 # could wait forever. This should check a global "quit" flag or
1065 # something every so often.
1066 io_events = self.poller.poll(timeout)
1068 for (evfd, evcond) in io_events:
1075 self.poller.unregister(self.sock)
1077 def _SocketOperation(self, op, arg1, error_msg, timeout_msg):
1078 """Wrapper around socket functions.
1080 This function abstracts error handling for socket operations, especially
1081 for the complicated interaction with OpenSSL.
1084 if op == self.OP_SEND:
1085 event_poll = select.POLLOUT
1086 event_check = select.POLLOUT
1087 timeout = self.WRITE_TIMEOUT
1089 elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1090 event_poll = select.POLLIN
1091 event_check = select.POLLIN | select.POLLPRI
1092 if op == self.OP_CLOSE_CHECK:
1093 timeout = self.CLOSE_TIMEOUT
1095 timeout = self.READ_TIMEOUT
1097 elif op == self.OP_SHUTDOWN:
1101 # The timeout is only used when OpenSSL requests polling for a condition.
1102 # It is not advisable to have no timeout for shutdown.
1103 timeout = self.WRITE_TIMEOUT
1106 raise AssertionError("Invalid socket operation")
1108 # No override by default
1112 # Poll only for certain operations and when asked for by an override
1113 if (event_override or
1114 op in (self.OP_SEND, self.OP_RECV, self.OP_CLOSE_CHECK)):
1116 wait_for_event = event_override
1118 wait_for_event = event_poll
1120 event = self._WaitForCondition(wait_for_event, timeout)
1122 raise _HttpClientTimeout(timeout_msg)
1124 if (op == self.OP_RECV and
1125 event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
1128 if not event & wait_for_event:
1136 if op == self.OP_SEND:
1137 return self.sock.send(arg1)
1139 elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1140 return self.sock.recv(arg1)
1142 elif op == self.OP_SHUTDOWN:
1144 # PyOpenSSL's shutdown() doesn't take arguments
1145 return self.sock.shutdown()
1147 return self.sock.shutdown(arg1)
1149 except OpenSSL.SSL.WantWriteError:
1150 # OpenSSL wants to write, poll for POLLOUT
1151 event_override = select.POLLOUT
1154 except OpenSSL.SSL.WantReadError:
1155 # OpenSSL wants to read, poll for POLLIN
1156 event_override = select.POLLIN | select.POLLPRI
1159 except OpenSSL.SSL.WantX509LookupError:
1162 except OpenSSL.SSL.SysCallError, err:
1163 if op == self.OP_SEND:
1164 # arg1 is the data when writing
1165 if err.args and err.args[0] == -1 and arg1 == "":
1166 # errors when writing empty strings are expected
1167 # and can be ignored
1170 elif op == self.OP_RECV:
1171 if err.args == (-1, _SSL_UNEXPECTED_EOF):
1174 raise socket.error(err.args)
1176 except OpenSSL.SSL.Error, err:
1177 raise socket.error(err.args)
1179 except socket.error, err:
1180 if err.args and err.args[0] == errno.EAGAIN:
1184 raise _HttpClientError("%s: %s" % (error_msg, str(err)))
1187 """Non-blocking connect to host with timeout.
1193 connect_error = self.sock.connect_ex((self.request.host,
1195 except socket.gaierror, err:
1196 raise _HttpClientError("Connection failed: %s" % str(err))
1198 if connect_error == errno.EINTR:
1202 elif connect_error == 0:
1203 # Connection established
1207 elif connect_error == errno.EINPROGRESS:
1208 # Connection started
1211 raise _HttpClientError("Connection failed (%s: %s)" %
1212 (connect_error, os.strerror(connect_error)))
1215 # Wait for connection
1216 event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT)
1218 raise _HttpClientError("Timeout while connecting to server")
1221 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1222 if connect_error != 0:
1223 raise _HttpClientError("Connection failed (%s: %s)" %
1224 (connect_error, os.strerror(connect_error)))
1226 # Enable TCP keep-alive
1227 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1229 # If needed, Linux specific options are available to change the TCP
1230 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1233 def _SendRequest(self):
1234 """Sends request to server.
1237 buf = self._BuildRequest()
1240 # Send only 4 KB at a time
1243 sent = self._SocketOperation(self.OP_SEND, data,
1244 "Error while sending request",
1245 "Timeout while sending request")
1250 assert not buf, "Request wasn't sent completely"
1252 def _ReadResponse(self):
1253 """Read response from server.
1255 Calls the parser function after reading a chunk of data.
1260 while self.parser_status != self.PS_COMPLETE:
1261 data = self._SocketOperation(self.OP_RECV, 4096,
1262 "Error while reading response",
1263 "Timeout while reading response")
1270 # Do some parsing and error checking while more data arrives
1271 buf = self._ParseBuffer(buf, eof)
1273 # Must be done only after the buffer has been evaluated
1275 self.parser_status in (self.PS_STATUS_LINE,
1277 raise _HttpClientError("Connection closed prematurely")
1280 buf = self._ParseBuffer(buf, True)
1282 assert self.parser_status == self.PS_COMPLETE
1283 assert not buf, "Parser didn't read full response"
1285 def _CloseConnection(self, force):
1286 """Closes the connection.
1289 if self.server_will_close and not force:
1290 # Wait for server to close
1292 # Check whether it's actually closed
1293 if not self._SocketOperation(self.OP_CLOSE_CHECK, 1,
1294 "Error", "Timeout"):
1296 except (socket.error, _HttpClientError):
1297 # Ignore errors at this stage
1300 # Close the connection from our side
1301 self._SocketOperation(self.OP_SHUTDOWN, socket.SHUT_RDWR,
1302 "Error while shutting down connection",
1303 "Timeout while shutting down connection")
1306 class _HttpClientPendingRequest(object):
1307 """Data class for pending requests.
1310 def __init__(self, request):
1311 self.request = request
1313 # Thread synchronization
1314 self.done = threading.Event()
1317 class HttpClientWorker(workerpool.BaseWorker):
1318 """HTTP client worker class.
1321 def RunTask(self, pend_req):
1323 HttpClientRequestExecutor(pend_req.request)
1328 class HttpClientWorkerPool(workerpool.WorkerPool):
1329 def __init__(self, manager):
1330 workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1332 self.manager = manager
1335 class HttpClientManager(object):
1336 """Manages HTTP requests.
1340 self._wpool = HttpClientWorkerPool(self)
1345 def ExecRequests(self, requests):
1346 """Execute HTTP requests.
1348 This function can be called from multiple threads at the same time.
1350 @type requests: List of HttpClientRequest instances
1351 @param requests: The requests to execute
1352 @rtype: List of HttpClientRequest instances
1353 @returns: The list of requests passed in
1356 # _HttpClientPendingRequest is used for internal thread synchronization
1357 pending = [_HttpClientPendingRequest(req) for req in requests]
1360 # Add requests to queue
1361 for pend_req in pending:
1362 self._wpool.AddTask(pend_req)
1365 # In case of an exception we should still wait for the rest, otherwise
1366 # another thread from the worker pool could modify the request object
1367 # after we returned.
1369 # And wait for them to finish
1370 for pend_req in pending:
1371 pend_req.done.wait()
1373 # Return original list
1377 self._wpool.Quiesce()
1378 self._wpool.TerminateWorkers()
1381 class _SSLFileObject(object):
1382 """Wrapper around socket._fileobject
1384 This wrapper is required to handle OpenSSL exceptions.
1387 def _RequireOpenSocket(fn):
1388 def wrapper(self, *args, **kwargs):
1390 raise SocketClosed("Socket is closed")
1391 return fn(self, *args, **kwargs)
1394 def __init__(self, sock, mode='rb', bufsize=-1):
1395 self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1397 def _ConnectionLost(self):
1400 def _getclosed(self):
1401 return self._base is None or self._base.closed
1402 closed = property(_getclosed, doc="True if the file is closed")
1406 return self._base.close()
1410 return self._base.flush()
1414 return self._base.fileno()
1417 def read(self, size=-1):
1418 return self._ReadWrapper(self._base.read, size=size)
1421 def readline(self, size=-1):
1422 return self._ReadWrapper(self._base.readline, size=size)
1424 def _ReadWrapper(self, fn, *args, **kwargs):
1427 return fn(*args, **kwargs)
1429 except OpenSSL.SSL.ZeroReturnError, err:
1430 self._ConnectionLost()
1433 except OpenSSL.SSL.WantReadError:
1436 #except OpenSSL.SSL.WantWriteError:
1439 except OpenSSL.SSL.SysCallError, (retval, desc):
1440 if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1442 self._ConnectionLost()
1445 logging.exception("Error in OpenSSL")
1446 self._ConnectionLost()
1447 raise socket.error(err.args)
1449 except OpenSSL.SSL.Error, err:
1450 self._ConnectionLost()
1451 raise socket.error(err.args)
1454 def write(self, data):
1455 return self._WriteWrapper(self._base.write, data)
1457 def _WriteWrapper(self, fn, *args, **kwargs):
1460 return fn(*args, **kwargs)
1461 except OpenSSL.SSL.ZeroReturnError, err:
1462 self._ConnectionLost()
1465 except OpenSSL.SSL.WantWriteError:
1468 #except OpenSSL.SSL.WantReadError:
1471 except OpenSSL.SSL.SysCallError, err:
1472 if err.args[0] == -1 and data == "":
1473 # errors when writing empty strings are expected
1474 # and can be ignored
1477 self._ConnectionLost()
1478 raise socket.error(err.args)
1480 except OpenSSL.SSL.Error, err:
1481 self._ConnectionLost()
1482 raise socket.error(err.args)