ganeti.http: Ignore ENOTCONN when shutting down the connection
[ganeti-local] / lib / http / __init__.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP module.
22
23 """
24
25 import logging
26 import mimetools
27 import OpenSSL
28 import select
29 import socket
30 import errno
31
32 from cStringIO import StringIO
33
34 from ganeti import constants
35 from ganeti import serializer
36 from ganeti import utils
37
38
39 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
40
41 HTTP_OK = 200
42 HTTP_NO_CONTENT = 204
43 HTTP_NOT_MODIFIED = 304
44
45 HTTP_0_9 = "HTTP/0.9"
46 HTTP_1_0 = "HTTP/1.0"
47 HTTP_1_1 = "HTTP/1.1"
48
49 HTTP_GET = "GET"
50 HTTP_HEAD = "HEAD"
51 HTTP_POST = "POST"
52 HTTP_PUT = "PUT"
53 HTTP_DELETE = "DELETE"
54
55 HTTP_ETAG = "ETag"
56 HTTP_HOST = "Host"
57 HTTP_SERVER = "Server"
58 HTTP_DATE = "Date"
59 HTTP_USER_AGENT = "User-Agent"
60 HTTP_CONTENT_TYPE = "Content-Type"
61 HTTP_CONTENT_LENGTH = "Content-Length"
62 HTTP_CONNECTION = "Connection"
63 HTTP_KEEP_ALIVE = "Keep-Alive"
64
65 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
66
67 # Socket operations
68 (SOCKOP_SEND,
69  SOCKOP_RECV,
70  SOCKOP_SHUTDOWN,
71  SOCKOP_HANDSHAKE) = range(4)
72
73 # send/receive quantum
74 SOCK_BUF_SIZE = 32768
75
76
77 class HttpError(Exception):
78   """Internal exception for HTTP errors.
79
80   This should only be used for internal error reporting.
81
82   """
83
84
85 class HttpConnectionClosed(Exception):
86   """Internal exception for a closed connection.
87
88   This should only be used for internal error reporting. Only use
89   it if there's no other way to report this condition.
90
91   """
92
93
94 class HttpSessionHandshakeUnexpectedEOF(HttpError):
95   """Internal exception for errors during SSL handshake.
96
97   This should only be used for internal error reporting.
98
99   """
100
101
102 class HttpSocketTimeout(Exception):
103   """Internal exception for socket timeouts.
104
105   This should only be used for internal error reporting.
106
107   """
108
109
110 class HttpException(Exception):
111   code = None
112   message = None
113
114   def __init__(self, message=None, headers=None):
115     Exception.__init__(self)
116     self.message = message
117     self.headers = headers
118
119
120 class HttpBadRequest(HttpException):
121   code = 400
122
123
124 class HttpForbidden(HttpException):
125   code = 403
126
127
128 class HttpNotFound(HttpException):
129   code = 404
130
131
132 class HttpGone(HttpException):
133   code = 410
134
135
136 class HttpLengthRequired(HttpException):
137   code = 411
138
139
140 class HttpInternalError(HttpException):
141   code = 500
142
143
144 class HttpNotImplemented(HttpException):
145   code = 501
146
147
148 class HttpServiceUnavailable(HttpException):
149   code = 503
150
151
152 class HttpVersionNotSupported(HttpException):
153   code = 505
154
155
156 class HttpJsonConverter:
157   CONTENT_TYPE = "application/json"
158
159   def Encode(self, data):
160     return serializer.DumpJson(data)
161
162   def Decode(self, data):
163     return serializer.LoadJson(data)
164
165
166 def WaitForSocketCondition(poller, sock, event, timeout):
167   """Waits for a condition to occur on the socket.
168
169   @type poller: select.Poller
170   @param poller: Poller object as created by select.poll()
171   @type sock: socket
172   @param sock: Wait for events on this socket
173   @type event: int
174   @param event: ORed condition (see select module)
175   @type timeout: float or None
176   @param timeout: Timeout in seconds
177   @rtype: int or None
178   @return: None for timeout, otherwise occured conditions
179
180   """
181   check = (event | select.POLLPRI |
182            select.POLLNVAL | select.POLLHUP | select.POLLERR)
183
184   if timeout is not None:
185     # Poller object expects milliseconds
186     timeout *= 1000
187
188   poller.register(sock, event)
189   try:
190     while True:
191       # TODO: If the main thread receives a signal and we have no timeout, we
192       # could wait forever. This should check a global "quit" flag or
193       # something every so often.
194       io_events = poller.poll(timeout)
195       if not io_events:
196         # Timeout
197         return None
198       for (evfd, evcond) in io_events:
199         if evcond & check:
200           return evcond
201   finally:
202     poller.unregister(sock)
203
204
205 def SocketOperation(poller, sock, op, arg1, timeout):
206   """Wrapper around socket functions.
207
208   This function abstracts error handling for socket operations, especially
209   for the complicated interaction with OpenSSL.
210
211   @type poller: select.Poller
212   @param poller: Poller object as created by select.poll()
213   @type sock: socket
214   @param sock: Socket for the operation
215   @type op: int
216   @param op: Operation to execute (SOCKOP_* constants)
217   @type arg1: any
218   @param arg1: Parameter for function (if needed)
219   @type timeout: None or float
220   @param timeout: Timeout in seconds or None
221   @return: Return value of socket function
222
223   """
224   # TODO: event_poll/event_check/override
225   if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
226     event_poll = select.POLLOUT
227     event_check = select.POLLOUT
228
229   elif op == SOCKOP_RECV:
230     event_poll = select.POLLIN
231     event_check = select.POLLIN | select.POLLPRI
232
233   elif op == SOCKOP_SHUTDOWN:
234     event_poll = None
235     event_check = None
236
237     # The timeout is only used when OpenSSL requests polling for a condition.
238     # It is not advisable to have no timeout for shutdown.
239     assert timeout
240
241   else:
242     raise AssertionError("Invalid socket operation")
243
244   # Handshake is only supported by SSL sockets
245   if (op == SOCKOP_HANDSHAKE and
246       not isinstance(sock, OpenSSL.SSL.ConnectionType)):
247     return
248
249   # No override by default
250   event_override = 0
251
252   while True:
253     # Poll only for certain operations and when asked for by an override
254     if event_override or op in (SOCKOP_SEND, SOCKOP_RECV, SOCKOP_HANDSHAKE):
255       if event_override:
256         wait_for_event = event_override
257       else:
258         wait_for_event = event_poll
259
260       event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
261       if event is None:
262         raise HttpSocketTimeout()
263
264       if (op == SOCKOP_RECV and
265           event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
266         return ""
267
268       if not event & wait_for_event:
269         continue
270
271     # Reset override
272     event_override = 0
273
274     try:
275       try:
276         if op == SOCKOP_SEND:
277           return sock.send(arg1)
278
279         elif op == SOCKOP_RECV:
280           return sock.recv(arg1)
281
282         elif op == SOCKOP_SHUTDOWN:
283           if isinstance(sock, OpenSSL.SSL.ConnectionType):
284             # PyOpenSSL's shutdown() doesn't take arguments
285             return sock.shutdown()
286           else:
287             return sock.shutdown(arg1)
288
289         elif op == SOCKOP_HANDSHAKE:
290           return sock.do_handshake()
291
292       except OpenSSL.SSL.WantWriteError:
293         # OpenSSL wants to write, poll for POLLOUT
294         event_override = select.POLLOUT
295         continue
296
297       except OpenSSL.SSL.WantReadError:
298         # OpenSSL wants to read, poll for POLLIN
299         event_override = select.POLLIN | select.POLLPRI
300         continue
301
302       except OpenSSL.SSL.WantX509LookupError:
303         continue
304
305       except OpenSSL.SSL.ZeroReturnError, err:
306         # SSL Connection has been closed. In SSL 3.0 and TLS 1.0, this only
307         # occurs if a closure alert has occurred in the protocol, i.e. the
308         # connection has been closed cleanly. Note that this does not
309         # necessarily mean that the transport layer (e.g. a socket) has been
310         # closed.
311         if op == SOCKOP_SEND:
312           # Can happen during a renegotiation
313           raise HttpConnectionClosed(err.args)
314         elif op == SOCKOP_RECV:
315           return ""
316
317         # SSL_shutdown shouldn't return SSL_ERROR_ZERO_RETURN
318         raise socket.error(err.args)
319
320       except OpenSSL.SSL.SysCallError, err:
321         if op == SOCKOP_SEND:
322           # arg1 is the data when writing
323           if err.args and err.args[0] == -1 and arg1 == "":
324             # errors when writing empty strings are expected
325             # and can be ignored
326             return 0
327
328         if err.args == (-1, _SSL_UNEXPECTED_EOF):
329           if op == SOCKOP_RECV:
330             return ""
331           elif op == SOCKOP_HANDSHAKE:
332             # Can happen if peer disconnects directly after the connection is
333             # opened.
334             raise HttpSessionHandshakeUnexpectedEOF(err.args)
335
336         raise socket.error(err.args)
337
338       except OpenSSL.SSL.Error, err:
339         raise socket.error(err.args)
340
341     except socket.error, err:
342       if err.args and err.args[0] == errno.EAGAIN:
343         # Ignore EAGAIN
344         continue
345
346       raise
347
348
349 def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
350                        force):
351   """Closes the connection.
352
353   @type poller: select.Poller
354   @param poller: Poller object as created by select.poll()
355   @type sock: socket
356   @param sock: Socket to be shut down
357   @type close_timeout: float
358   @param close_timeout: How long to wait for the peer to close the connection
359   @type write_timeout: float
360   @param write_timeout: Write timeout for shutdown
361   @type msgreader: http.HttpMessageReader
362   @param msgreader: Request message reader, used to determine whether peer
363                     should close connection
364   @type force: bool
365   @param force: Whether to forcibly close the connection without waiting
366                 for peer
367
368   """
369   poller = select.poll()
370
371   #print msgreader.peer_will_close, force
372   if msgreader and msgreader.peer_will_close and not force:
373     # Wait for peer to close
374     try:
375       # Check whether it's actually closed
376       if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
377         return
378     except (socket.error, HttpError, HttpSocketTimeout):
379       # Ignore errors at this stage
380       pass
381
382   # Close the connection from our side
383   try:
384     # We don't care about the return value, see NOTES in SSL_shutdown(3).
385     SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
386                     write_timeout)
387   except HttpSocketTimeout:
388     raise HttpError("Timeout while shutting down connection")
389   except socket.error, err:
390     # Ignore ENOTCONN
391     if not (err.args and err.args[0] == errno.ENOTCONN):
392       raise HttpError("Error while shutting down connection: %s" % err)
393
394
395 def Handshake(poller, sock, write_timeout):
396   """Shakes peer's hands.
397
398   @type poller: select.Poller
399   @param poller: Poller object as created by select.poll()
400   @type sock: socket
401   @param sock: Socket to be shut down
402   @type write_timeout: float
403   @param write_timeout: Write timeout for handshake
404
405   """
406   try:
407     return SocketOperation(poller, sock, SOCKOP_HANDSHAKE, None, write_timeout)
408   except HttpSocketTimeout:
409     raise HttpError("Timeout during SSL handshake")
410   except socket.error, err:
411     raise HttpError("Error in SSL handshake: %s" % err)
412
413
414 class HttpSslParams(object):
415   """Data class for SSL key and certificate.
416
417   """
418   def __init__(self, ssl_key_path, ssl_cert_path):
419     """Initializes this class.
420
421     @type ssl_key_path: string
422     @param ssl_key_path: Path to file containing SSL key in PEM format
423     @type ssl_cert_path: string
424     @param ssl_cert_path: Path to file containing SSL certificate in PEM format
425
426     """
427     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
428     self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
429
430   def GetKey(self):
431     return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
432                                           self.ssl_key_pem)
433
434   def GetCertificate(self):
435     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
436                                            self.ssl_cert_pem)
437
438
439 class HttpBase(object):
440   """Base class for HTTP server and client.
441
442   """
443   def __init__(self):
444     self.using_ssl = None
445     self._ssl_params = None
446     self._ssl_key = None
447     self._ssl_cert = None
448
449   def _CreateSocket(self, ssl_params, ssl_verify_peer):
450     """Creates a TCP socket and initializes SSL if needed.
451
452     @type ssl_params: HttpSslParams
453     @param ssl_params: SSL key and certificate
454     @type ssl_verify_peer: bool
455     @param ssl_verify_peer: Whether to require client certificate and compare
456                             it with our certificate
457
458     """
459     self._ssl_params = ssl_params
460
461     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
462
463     # Should we enable SSL?
464     self.using_ssl = ssl_params is not None
465
466     if not self.using_ssl:
467       return sock
468
469     self._ssl_key = ssl_params.GetKey()
470     self._ssl_cert = ssl_params.GetCertificate()
471
472     ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
473     ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
474
475     ctx.use_privatekey(self._ssl_key)
476     ctx.use_certificate(self._ssl_cert)
477     ctx.check_privatekey()
478
479     if ssl_verify_peer:
480       ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
481                      OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
482                      self._SSLVerifyCallback)
483
484     return OpenSSL.SSL.Connection(ctx, sock)
485
486   def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
487     """Verify the certificate provided by the peer
488
489     We only compare fingerprints. The client must use the same certificate as
490     we do on our side.
491
492     """
493     assert self._ssl_params, "SSL not initialized"
494
495     return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
496             self._ssl_cert.digest("md5") == cert.digest("md5"))
497
498
499 class HttpMessage(object):
500   """Data structure for HTTP message.
501
502   """
503   def __init__(self):
504     self.start_line = None
505     self.headers = None
506     self.body = None
507     self.decoded_body = None
508
509
510 class HttpClientToServerStartLine(object):
511   """Data structure for HTTP request start line.
512
513   """
514   def __init__(self, method, path, version):
515     self.method = method
516     self.path = path
517     self.version = version
518
519   def __str__(self):
520     return "%s %s %s" % (self.method, self.path, self.version)
521
522
523 class HttpServerToClientStartLine(object):
524   """Data structure for HTTP response start line.
525
526   """
527   def __init__(self, version, code, reason):
528     self.version = version
529     self.code = code
530     self.reason = reason
531
532   def __str__(self):
533     return "%s %s %s" % (self.version, self.code, self.reason)
534
535
536 class HttpMessageWriter(object):
537   """Writes an HTTP message to a socket.
538
539   """
540   def __init__(self, sock, msg, write_timeout):
541     """Initializes this class and writes an HTTP message to a socket.
542
543     @type sock: socket
544     @param sock: Socket to be written to
545     @type msg: http.HttpMessage
546     @param msg: HTTP message to be written
547     @type write_timeout: float
548     @param write_timeout: Write timeout for socket
549
550     """
551     self._msg = msg
552
553     self._PrepareMessage()
554
555     buf = self._FormatMessage()
556
557     poller = select.poll()
558
559     pos = 0
560     end = len(buf)
561     while pos < end:
562       # Send only SOCK_BUF_SIZE bytes at a time
563       data = buf[pos:(pos + SOCK_BUF_SIZE)]
564
565       sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
566                              write_timeout)
567
568       # Remove sent bytes
569       pos += sent
570
571     assert pos == end, "Message wasn't sent completely"
572
573   def _PrepareMessage(self):
574     """Prepares the HTTP message by setting mandatory headers.
575
576     """
577     # RFC2616, section 4.3: "The presence of a message-body in a request is
578     # signaled by the inclusion of a Content-Length or Transfer-Encoding header
579     # field in the request's message-headers."
580     if self._msg.body:
581       self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
582
583   def _FormatMessage(self):
584     """Serializes the HTTP message into a string.
585
586     """
587     buf = StringIO()
588
589     # Add start line
590     buf.write(str(self._msg.start_line))
591     buf.write("\r\n")
592
593     # Add headers
594     if self._msg.start_line.version != HTTP_0_9:
595       for name, value in self._msg.headers.iteritems():
596         buf.write("%s: %s\r\n" % (name, value))
597
598     buf.write("\r\n")
599
600     # Add message body if needed
601     if self.HasMessageBody():
602       buf.write(self._msg.body)
603
604     elif self._msg.body:
605       logging.warning("Ignoring message body")
606
607     return buf.getvalue()
608
609   def HasMessageBody(self):
610     """Checks whether the HTTP message contains a body.
611
612     Can be overriden by subclasses.
613
614     """
615     return bool(self._msg.body)
616
617
618 class HttpMessageReader(object):
619   """Reads HTTP message from socket.
620
621   """
622   # Length limits
623   START_LINE_LENGTH_MAX = None
624   HEADER_LENGTH_MAX = None
625
626   # Parser state machine
627   PS_START_LINE = "start-line"
628   PS_HEADERS = "headers"
629   PS_BODY = "entity-body"
630   PS_COMPLETE = "complete"
631
632   def __init__(self, sock, msg, read_timeout):
633     """Reads an HTTP message from a socket.
634
635     @type sock: socket
636     @param sock: Socket to be read from
637     @type msg: http.HttpMessage
638     @param msg: Object for the read message
639     @type read_timeout: float
640     @param read_timeout: Read timeout for socket
641
642     """
643     self.sock = sock
644     self.msg = msg
645
646     self.poller = select.poll()
647     self.start_line_buffer = None
648     self.header_buffer = StringIO()
649     self.body_buffer = StringIO()
650     self.parser_status = self.PS_START_LINE
651     self.content_length = None
652     self.peer_will_close = None
653
654     buf = ""
655     eof = False
656     while self.parser_status != self.PS_COMPLETE:
657       # TODO: Don't read more than necessary (Content-Length), otherwise
658       # data might be lost and/or an error could occur
659       data = SocketOperation(self.poller, sock, SOCKOP_RECV, SOCK_BUF_SIZE,
660                              read_timeout)
661
662       if data:
663         buf += data
664       else:
665         eof = True
666
667       # Do some parsing and error checking while more data arrives
668       buf = self._ContinueParsing(buf, eof)
669
670       # Must be done only after the buffer has been evaluated
671       # TODO: Connection-length < len(data read) and connection closed
672       if (eof and
673           self.parser_status in (self.PS_START_LINE,
674                                  self.PS_HEADERS)):
675         raise HttpError("Connection closed prematurely")
676
677     # Parse rest
678     buf = self._ContinueParsing(buf, True)
679
680     assert self.parser_status == self.PS_COMPLETE
681     assert not buf, "Parser didn't read full response"
682
683     msg.body = self.body_buffer.getvalue()
684
685     # TODO: Content-type, error handling
686     if msg.body:
687       msg.decoded_body = HttpJsonConverter().Decode(msg.body)
688     else:
689       msg.decoded_body = None
690
691     if msg.decoded_body:
692       logging.debug("Message body: %s", msg.decoded_body)
693
694   def _ContinueParsing(self, buf, eof):
695     """Main function for HTTP message state machine.
696
697     @type buf: string
698     @param buf: Receive buffer
699     @type eof: bool
700     @param eof: Whether we've reached EOF on the socket
701     @rtype: string
702     @return: Updated receive buffer
703
704     """
705     # TODO: Use offset instead of slicing when possible
706     if self.parser_status == self.PS_START_LINE:
707       # Expect start line
708       while True:
709         idx = buf.find("\r\n")
710
711         # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
712         # ignore any empty line(s) received where a Request-Line is expected.
713         # In other words, if the server is reading the protocol stream at the
714         # beginning of a message and receives a CRLF first, it should ignore
715         # the CRLF."
716         if idx == 0:
717           # TODO: Limit number of CRLFs/empty lines for safety?
718           buf = buf[:2]
719           continue
720
721         if idx > 0:
722           self.start_line_buffer = buf[:idx]
723
724           self._CheckStartLineLength(len(self.start_line_buffer))
725
726           # Remove status line, including CRLF
727           buf = buf[idx + 2:]
728
729           self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
730
731           self.parser_status = self.PS_HEADERS
732         else:
733           # Check whether incoming data is getting too large, otherwise we just
734           # fill our read buffer.
735           self._CheckStartLineLength(len(buf))
736
737         break
738
739     # TODO: Handle messages without headers
740     if self.parser_status == self.PS_HEADERS:
741       # Wait for header end
742       idx = buf.find("\r\n\r\n")
743       if idx >= 0:
744         self.header_buffer.write(buf[:idx + 2])
745
746         self._CheckHeaderLength(self.header_buffer.tell())
747
748         # Remove headers, including CRLF
749         buf = buf[idx + 4:]
750
751         self._ParseHeaders()
752
753         self.parser_status = self.PS_BODY
754       else:
755         # Check whether incoming data is getting too large, otherwise we just
756         # fill our read buffer.
757         self._CheckHeaderLength(len(buf))
758
759     if self.parser_status == self.PS_BODY:
760       # TODO: Implement max size for body_buffer
761       self.body_buffer.write(buf)
762       buf = ""
763
764       # Check whether we've read everything
765       #
766       # RFC2616, section 4.4: "When a message-body is included with a message,
767       # the transfer-length of that body is determined by one of the following
768       # [...] 5. By the server closing the connection. (Closing the connection
769       # cannot be used to indicate the end of a request body, since that would
770       # leave no possibility for the server to send back a response.)"
771       #
772       # TODO: Error when buffer length > Content-Length header
773       if (eof or
774           self.content_length is None or
775           (self.content_length is not None and
776            self.body_buffer.tell() >= self.content_length)):
777         self.parser_status = self.PS_COMPLETE
778
779     return buf
780
781   def _CheckStartLineLength(self, length):
782     """Limits the start line buffer size.
783
784     @type length: int
785     @param length: Buffer size
786
787     """
788     if (self.START_LINE_LENGTH_MAX is not None and
789         length > self.START_LINE_LENGTH_MAX):
790       raise HttpError("Start line longer than %d chars" %
791                        self.START_LINE_LENGTH_MAX)
792
793   def _CheckHeaderLength(self, length):
794     """Limits the header buffer size.
795
796     @type length: int
797     @param length: Buffer size
798
799     """
800     if (self.HEADER_LENGTH_MAX is not None and
801         length > self.HEADER_LENGTH_MAX):
802       raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
803
804   def ParseStartLine(self, start_line):
805     """Parses the start line of a message.
806
807     Must be overriden by subclass.
808
809     @type start_line: string
810     @param start_line: Start line string
811
812     """
813     raise NotImplementedError()
814
815   def _WillPeerCloseConnection(self):
816     """Evaluate whether peer will close the connection.
817
818     @rtype: bool
819     @return: Whether peer will close the connection
820
821     """
822     # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
823     # for the sender to signal that the connection will be closed after
824     # completion of the response. For example,
825     #
826     #        Connection: close
827     #
828     # in either the request or the response header fields indicates that the
829     # connection SHOULD NOT be considered `persistent' (section 8.1) after the
830     # current request/response is complete."
831
832     hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
833     if hdr_connection:
834       hdr_connection = hdr_connection.lower()
835
836     # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
837     if self.msg.start_line.version == HTTP_1_1:
838       return (hdr_connection and "close" in hdr_connection)
839
840     # Some HTTP/1.0 implementations have support for persistent connections,
841     # using rules different than HTTP/1.1.
842
843     # For older HTTP, Keep-Alive indicates persistent connection.
844     if self.msg.headers.get(HTTP_KEEP_ALIVE):
845       return False
846
847     # At least Akamai returns a "Connection: Keep-Alive" header, which was
848     # supposed to be sent by the client.
849     if hdr_connection and "keep-alive" in hdr_connection:
850       return False
851
852     return True
853
854   def _ParseHeaders(self):
855     """Parses the headers.
856
857     This function also adjusts internal variables based on header values.
858
859     RFC2616, section 4.3: "The presence of a message-body in a request is
860     signaled by the inclusion of a Content-Length or Transfer-Encoding header
861     field in the request's message-headers."
862
863     """
864     # Parse headers
865     self.header_buffer.seek(0, 0)
866     self.msg.headers = mimetools.Message(self.header_buffer, 0)
867
868     self.peer_will_close = self._WillPeerCloseConnection()
869
870     # Do we have a Content-Length header?
871     hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
872     if hdr_content_length:
873       try:
874         self.content_length = int(hdr_content_length)
875       except ValueError:
876         self.content_length = None
877       if self.content_length is not None and self.content_length < 0:
878         self.content_length = None
879
880     # if the connection remains open and a content-length was not provided,
881     # then assume that the connection WILL close.
882     if self.content_length is None:
883       self.peer_will_close = True