Statistics
| Branch: | Tag: | Revision:

root / lib / http / __init__.py @ 5a9c3f46

History | View | Annotate | Download (21.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP module.
22

23
"""
24

    
25
import logging
26
import mimetools
27
import OpenSSL
28
import select
29
import socket
30
import errno
31

    
32
from cStringIO import StringIO
33

    
34
from ganeti import constants
35
from ganeti import serializer
36
from ganeti import utils
37

    
38

    
39
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
40

    
41
HTTP_OK = 200
42
HTTP_NO_CONTENT = 204
43
HTTP_NOT_MODIFIED = 304
44

    
45
HTTP_0_9 = "HTTP/0.9"
46
HTTP_1_0 = "HTTP/1.0"
47
HTTP_1_1 = "HTTP/1.1"
48

    
49
HTTP_GET = "GET"
50
HTTP_HEAD = "HEAD"
51
HTTP_POST = "POST"
52
HTTP_PUT = "PUT"
53
HTTP_DELETE = "DELETE"
54

    
55
HTTP_ETAG = "ETag"
56
HTTP_HOST = "Host"
57
HTTP_SERVER = "Server"
58
HTTP_DATE = "Date"
59
HTTP_USER_AGENT = "User-Agent"
60
HTTP_CONTENT_TYPE = "Content-Type"
61
HTTP_CONTENT_LENGTH = "Content-Length"
62
HTTP_CONNECTION = "Connection"
63
HTTP_KEEP_ALIVE = "Keep-Alive"
64

    
65
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
66

    
67
# Socket operations
68
(SOCKOP_SEND,
69
 SOCKOP_RECV,
70
 SOCKOP_SHUTDOWN) = range(3)
71

    
72
# send/receive quantum
73
SOCK_BUF_SIZE = 32768
74

    
75

    
76
class HttpError(Exception):
77
  """Internal exception for HTTP errors.
78

79
  This should only be used for internal error reporting.
80

81
  """
82

    
83

    
84
class HttpSocketTimeout(Exception):
85
  """Internal exception for socket timeouts.
86

87
  This should only be used for internal error reporting.
88

89
  """
90

    
91

    
92
class HttpException(Exception):
93
  code = None
94
  message = None
95

    
96
  def __init__(self, message=None):
97
    Exception.__init__(self)
98
    if message is not None:
99
      self.message = message
100

    
101

    
102
class HttpBadRequest(HttpException):
103
  code = 400
104

    
105

    
106
class HttpForbidden(HttpException):
107
  code = 403
108

    
109

    
110
class HttpNotFound(HttpException):
111
  code = 404
112

    
113

    
114
class HttpGone(HttpException):
115
  code = 410
116

    
117

    
118
class HttpLengthRequired(HttpException):
119
  code = 411
120

    
121

    
122
class HttpInternalError(HttpException):
123
  code = 500
124

    
125

    
126
class HttpNotImplemented(HttpException):
127
  code = 501
128

    
129

    
130
class HttpServiceUnavailable(HttpException):
131
  code = 503
132

    
133

    
134
class HttpVersionNotSupported(HttpException):
135
  code = 505
136

    
137

    
138
class HttpJsonConverter:
139
  CONTENT_TYPE = "application/json"
140

    
141
  def Encode(self, data):
142
    return serializer.DumpJson(data)
143

    
144
  def Decode(self, data):
145
    return serializer.LoadJson(data)
146

    
147

    
148
def WaitForSocketCondition(poller, sock, event, timeout):
149
  """Waits for a condition to occur on the socket.
150

151
  @type poller: select.Poller
152
  @param poller: Poller object as created by select.poll()
153
  @type sock: socket
154
  @param sock: Wait for events on this socket
155
  @type event: int
156
  @param event: ORed condition (see select module)
157
  @type timeout: float or None
158
  @param timeout: Timeout in seconds
159
  @rtype: int or None
160
  @return: None for timeout, otherwise occured conditions
161

162
  """
163
  check = (event | select.POLLPRI |
164
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
165

    
166
  if timeout is not None:
167
    # Poller object expects milliseconds
168
    timeout *= 1000
169

    
170
  poller.register(sock, event)
171
  try:
172
    while True:
173
      # TODO: If the main thread receives a signal and we have no timeout, we
174
      # could wait forever. This should check a global "quit" flag or
175
      # something every so often.
176
      io_events = poller.poll(timeout)
177
      if not io_events:
178
        # Timeout
179
        return None
180
      for (evfd, evcond) in io_events:
181
        if evcond & check:
182
          return evcond
183
  finally:
184
    poller.unregister(sock)
185

    
186

    
187
def SocketOperation(poller, sock, op, arg1, timeout):
188
  """Wrapper around socket functions.
189

190
  This function abstracts error handling for socket operations, especially
191
  for the complicated interaction with OpenSSL.
192

193
  @type poller: select.Poller
194
  @param poller: Poller object as created by select.poll()
195
  @type sock: socket
196
  @param sock: Socket for the operation
197
  @type op: int
198
  @param op: Operation to execute (SOCKOP_* constants)
199
  @type arg1: any
200
  @param arg1: Parameter for function (if needed)
201
  @type timeout: None or float
202
  @param timeout: Timeout in seconds or None
203
  @return: Return value of socket function
204

205
  """
206
  # TODO: event_poll/event_check/override
207
  if op == SOCKOP_SEND:
208
    event_poll = select.POLLOUT
209
    event_check = select.POLLOUT
210

    
211
  elif op == SOCKOP_RECV:
212
    event_poll = select.POLLIN
213
    event_check = select.POLLIN | select.POLLPRI
214

    
215
  elif op == SOCKOP_SHUTDOWN:
216
    event_poll = None
217
    event_check = None
218

    
219
    # The timeout is only used when OpenSSL requests polling for a condition.
220
    # It is not advisable to have no timeout for shutdown.
221
    assert timeout
222

    
223
  else:
224
    raise AssertionError("Invalid socket operation")
225

    
226
  # No override by default
227
  event_override = 0
228

    
229
  while True:
230
    # Poll only for certain operations and when asked for by an override
231
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
232
      if event_override:
233
        wait_for_event = event_override
234
      else:
235
        wait_for_event = event_poll
236

    
237
      event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
238
      if event is None:
239
        raise HttpSocketTimeout()
240

    
241
      if (op == SOCKOP_RECV and
242
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
243
        return ""
244

    
245
      if not event & wait_for_event:
246
        continue
247

    
248
    # Reset override
249
    event_override = 0
250

    
251
    try:
252
      try:
253
        if op == SOCKOP_SEND:
254
          return sock.send(arg1)
255

    
256
        elif op == SOCKOP_RECV:
257
          return sock.recv(arg1)
258

    
259
        elif op == SOCKOP_SHUTDOWN:
260
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
261
            # PyOpenSSL's shutdown() doesn't take arguments
262
            return sock.shutdown()
263
          else:
264
            return sock.shutdown(arg1)
265

    
266
      except OpenSSL.SSL.WantWriteError:
267
        # OpenSSL wants to write, poll for POLLOUT
268
        event_override = select.POLLOUT
269
        continue
270

    
271
      except OpenSSL.SSL.WantReadError:
272
        # OpenSSL wants to read, poll for POLLIN
273
        event_override = select.POLLIN | select.POLLPRI
274
        continue
275

    
276
      except OpenSSL.SSL.WantX509LookupError:
277
        continue
278

    
279
      except OpenSSL.SSL.SysCallError, err:
280
        if op == SOCKOP_SEND:
281
          # arg1 is the data when writing
282
          if err.args and err.args[0] == -1 and arg1 == "":
283
            # errors when writing empty strings are expected
284
            # and can be ignored
285
            return 0
286

    
287
        elif op == SOCKOP_RECV:
288
          if err.args == (-1, _SSL_UNEXPECTED_EOF):
289
            return ""
290

    
291
        raise socket.error(err.args)
292

    
293
      except OpenSSL.SSL.Error, err:
294
        raise socket.error(err.args)
295

    
296
    except socket.error, err:
297
      if err.args and err.args[0] == errno.EAGAIN:
298
        # Ignore EAGAIN
299
        continue
300

    
301
      raise
302

    
303

    
304
def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
305
                       force):
306
  """Closes the connection.
307

308
  @type poller: select.Poller
309
  @param poller: Poller object as created by select.poll()
310
  @type sock: socket
311
  @param sock: Socket to be shut down
312
  @type close_timeout: float
313
  @param close_timeout: How long to wait for the peer to close the connection
314
  @type write_timeout: float
315
  @param write_timeout: Write timeout for shutdown
316
  @type msgreader: http.HttpMessageReader
317
  @param msgreader: Request message reader, used to determine whether peer
318
                    should close connection
319
  @type force: bool
320
  @param force: Whether to forcibly close the connection without waiting
321
                for peer
322

323
  """
324
  poller = select.poll()
325

    
326
  #print msgreader.peer_will_close, force
327
  if msgreader and msgreader.peer_will_close and not force:
328
    # Wait for peer to close
329
    try:
330
      # Check whether it's actually closed
331
      if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
332
        return
333
    except (socket.error, HttpError, HttpSocketTimeout):
334
      # Ignore errors at this stage
335
      pass
336

    
337
  # Close the connection from our side
338
  try:
339
    SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
340
                    write_timeout)
341
  except HttpSocketTimeout:
342
    raise HttpError("Timeout while shutting down connection")
343
  except socket.error, err:
344
    raise HttpError("Error while shutting down connection: %s" % err)
345

    
346

    
347
class HttpSslParams(object):
348
  """Data class for SSL key and certificate.
349

350
  """
351
  def __init__(self, ssl_key_path, ssl_cert_path):
352
    """Initializes this class.
353

354
    @type ssl_key_path: string
355
    @param ssl_key_path: Path to file containing SSL key in PEM format
356
    @type ssl_cert_path: string
357
    @param ssl_cert_path: Path to file containing SSL certificate in PEM format
358

359
    """
360
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
361
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
362

    
363
  def GetKey(self):
364
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
365
                                          self.ssl_key_pem)
366

    
367
  def GetCertificate(self):
368
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
369
                                           self.ssl_cert_pem)
370

    
371

    
372
class HttpBase(object):
373
  """Base class for HTTP server and client.
374

375
  """
376
  def __init__(self):
377
    self.using_ssl = None
378
    self._ssl_params = None
379
    self._ssl_key = None
380
    self._ssl_cert = None
381

    
382
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
383
    """Creates a TCP socket and initializes SSL if needed.
384

385
    @type ssl_params: HttpSslParams
386
    @param ssl_params: SSL key and certificate
387
    @type ssl_verify_peer: bool
388
    @param ssl_verify_peer: Whether to require client certificate and compare
389
                            it with our certificate
390

391
    """
392
    self._ssl_params = ssl_params
393

    
394
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
395

    
396
    # Should we enable SSL?
397
    self.using_ssl = ssl_params is not None
398

    
399
    if not self.using_ssl:
400
      return sock
401

    
402
    self._ssl_key = ssl_params.GetKey()
403
    self._ssl_cert = ssl_params.GetCertificate()
404

    
405
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
406
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
407

    
408
    ctx.use_privatekey(self._ssl_key)
409
    ctx.use_certificate(self._ssl_cert)
410
    ctx.check_privatekey()
411

    
412
    if ssl_verify_peer:
413
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
414
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
415
                     self._SSLVerifyCallback)
416

    
417
    return OpenSSL.SSL.Connection(ctx, sock)
418

    
419
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
420
    """Verify the certificate provided by the peer
421

422
    We only compare fingerprints. The client must use the same certificate as
423
    we do on our side.
424

425
    """
426
    assert self._ssl_params, "SSL not initialized"
427

    
428
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
429
            self._ssl_cert.digest("md5") == cert.digest("md5"))
430

    
431

    
432
class HttpMessage(object):
433
  """Data structure for HTTP message.
434

435
  """
436
  def __init__(self):
437
    self.start_line = None
438
    self.headers = None
439
    self.body = None
440
    self.decoded_body = None
441

    
442

    
443
class HttpClientToServerStartLine(object):
444
  """Data structure for HTTP request start line.
445

446
  """
447
  def __init__(self, method, path, version):
448
    self.method = method
449
    self.path = path
450
    self.version = version
451

    
452
  def __str__(self):
453
    return "%s %s %s" % (self.method, self.path, self.version)
454

    
455

    
456
class HttpServerToClientStartLine(object):
457
  """Data structure for HTTP response start line.
458

459
  """
460
  def __init__(self, version, code, reason):
461
    self.version = version
462
    self.code = code
463
    self.reason = reason
464

    
465
  def __str__(self):
466
    return "%s %s %s" % (self.version, self.code, self.reason)
467

    
468

    
469
class HttpMessageWriter(object):
470
  """Writes an HTTP message to a socket.
471

472
  """
473
  def __init__(self, sock, msg, write_timeout):
474
    """Initializes this class and writes an HTTP message to a socket.
475

476
    @type sock: socket
477
    @param sock: Socket to be written to
478
    @type msg: http.HttpMessage
479
    @param msg: HTTP message to be written
480
    @type write_timeout: float
481
    @param write_timeout: Write timeout for socket
482

483
    """
484
    self._msg = msg
485

    
486
    self._PrepareMessage()
487

    
488
    buf = self._FormatMessage()
489

    
490
    poller = select.poll()
491

    
492
    pos = 0
493
    end = len(buf)
494
    while pos < end:
495
      # Send only SOCK_BUF_SIZE bytes at a time
496
      data = buf[pos:(pos + SOCK_BUF_SIZE)]
497

    
498
      sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
499
                             write_timeout)
500

    
501
      # Remove sent bytes
502
      pos += sent
503

    
504
    assert pos == end, "Message wasn't sent completely"
505

    
506
  def _PrepareMessage(self):
507
    """Prepares the HTTP message by setting mandatory headers.
508

509
    """
510
    # RFC2616, section 4.3: "The presence of a message-body in a request is
511
    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
512
    # field in the request's message-headers."
513
    if self._msg.body:
514
      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
515

    
516
  def _FormatMessage(self):
517
    """Serializes the HTTP message into a string.
518

519
    """
520
    buf = StringIO()
521

    
522
    # Add start line
523
    buf.write(str(self._msg.start_line))
524
    buf.write("\r\n")
525

    
526
    # Add headers
527
    if self._msg.start_line.version != HTTP_0_9:
528
      for name, value in self._msg.headers.iteritems():
529
        buf.write("%s: %s\r\n" % (name, value))
530

    
531
    buf.write("\r\n")
532

    
533
    # Add message body if needed
534
    if self.HasMessageBody():
535
      buf.write(self._msg.body)
536

    
537
    elif self._msg.body:
538
      logging.warning("Ignoring message body")
539

    
540
    return buf.getvalue()
541

    
542
  def HasMessageBody(self):
543
    """Checks whether the HTTP message contains a body.
544

545
    Can be overriden by subclasses.
546

547
    """
548
    return bool(self._msg.body)
549

    
550

    
551
class HttpMessageReader(object):
552
  """Reads HTTP message from socket.
553

554
  """
555
  # Length limits
556
  START_LINE_LENGTH_MAX = None
557
  HEADER_LENGTH_MAX = None
558

    
559
  # Parser state machine
560
  PS_START_LINE = "start-line"
561
  PS_HEADERS = "headers"
562
  PS_BODY = "entity-body"
563
  PS_COMPLETE = "complete"
564

    
565
  def __init__(self, sock, msg, read_timeout):
566
    """Reads an HTTP message from a socket.
567

568
    @type sock: socket
569
    @param sock: Socket to be read from
570
    @type msg: http.HttpMessage
571
    @param msg: Object for the read message
572
    @type read_timeout: float
573
    @param read_timeout: Read timeout for socket
574

575
    """
576
    self.sock = sock
577
    self.msg = msg
578

    
579
    self.poller = select.poll()
580
    self.start_line_buffer = None
581
    self.header_buffer = StringIO()
582
    self.body_buffer = StringIO()
583
    self.parser_status = self.PS_START_LINE
584
    self.content_length = None
585
    self.peer_will_close = None
586

    
587
    buf = ""
588
    eof = False
589
    while self.parser_status != self.PS_COMPLETE:
590
      data = SocketOperation(self.poller, sock, SOCKOP_RECV, SOCK_BUF_SIZE,
591
                             read_timeout)
592

    
593
      if data:
594
        buf += data
595
      else:
596
        eof = True
597

    
598
      # Do some parsing and error checking while more data arrives
599
      buf = self._ContinueParsing(buf, eof)
600

    
601
      # Must be done only after the buffer has been evaluated
602
      # TODO: Connection-length < len(data read) and connection closed
603
      if (eof and
604
          self.parser_status in (self.PS_START_LINE,
605
                                 self.PS_HEADERS)):
606
        raise HttpError("Connection closed prematurely")
607

    
608
    # Parse rest
609
    buf = self._ContinueParsing(buf, True)
610

    
611
    assert self.parser_status == self.PS_COMPLETE
612
    assert not buf, "Parser didn't read full response"
613

    
614
    msg.body = self.body_buffer.getvalue()
615

    
616
    # TODO: Content-type, error handling
617
    if msg.body:
618
      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
619
    else:
620
      msg.decoded_body = None
621

    
622
    if msg.decoded_body:
623
      logging.debug("Message body: %s", msg.decoded_body)
624

    
625
  def _ContinueParsing(self, buf, eof):
626
    """Main function for HTTP message state machine.
627

628
    @type buf: string
629
    @param buf: Receive buffer
630
    @type eof: bool
631
    @param eof: Whether we've reached EOF on the socket
632
    @rtype: string
633
    @return: Updated receive buffer
634

635
    """
636
    if self.parser_status == self.PS_START_LINE:
637
      # Expect start line
638
      while True:
639
        idx = buf.find("\r\n")
640

    
641
        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
642
        # ignore any empty line(s) received where a Request-Line is expected.
643
        # In other words, if the server is reading the protocol stream at the
644
        # beginning of a message and receives a CRLF first, it should ignore
645
        # the CRLF."
646
        if idx == 0:
647
          # TODO: Limit number of CRLFs/empty lines for safety?
648
          buf = buf[:2]
649
          continue
650

    
651
        if idx > 0:
652
          self.start_line_buffer = buf[:idx]
653

    
654
          self._CheckStartLineLength(len(self.start_line_buffer))
655

    
656
          # Remove status line, including CRLF
657
          buf = buf[idx + 2:]
658

    
659
          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
660

    
661
          self.parser_status = self.PS_HEADERS
662
        else:
663
          # Check whether incoming data is getting too large, otherwise we just
664
          # fill our read buffer.
665
          self._CheckStartLineLength(len(buf))
666

    
667
        break
668

    
669
    # TODO: Handle messages without headers
670
    if self.parser_status == self.PS_HEADERS:
671
      # Wait for header end
672
      idx = buf.find("\r\n\r\n")
673
      if idx >= 0:
674
        self.header_buffer.write(buf[:idx + 2])
675

    
676
        self._CheckHeaderLength(self.header_buffer.tell())
677

    
678
        # Remove headers, including CRLF
679
        buf = buf[idx + 4:]
680

    
681
        self._ParseHeaders()
682

    
683
        self.parser_status = self.PS_BODY
684
      else:
685
        # Check whether incoming data is getting too large, otherwise we just
686
        # fill our read buffer.
687
        self._CheckHeaderLength(len(buf))
688

    
689
    if self.parser_status == self.PS_BODY:
690
      # TODO: Implement max size for body_buffer
691
      self.body_buffer.write(buf)
692
      buf = ""
693

    
694
      # Check whether we've read everything
695
      #
696
      # RFC2616, section 4.4: "When a message-body is included with a message,
697
      # the transfer-length of that body is determined by one of the following
698
      # [...] 5. By the server closing the connection. (Closing the connection
699
      # cannot be used to indicate the end of a request body, since that would
700
      # leave no possibility for the server to send back a response.)"
701
      if (eof or
702
          self.content_length is None or
703
          (self.content_length is not None and
704
           self.body_buffer.tell() >= self.content_length)):
705
        self.parser_status = self.PS_COMPLETE
706

    
707
    return buf
708

    
709
  def _CheckStartLineLength(self, length):
710
    """Limits the start line buffer size.
711

712
    @type length: int
713
    @param length: Buffer size
714

715
    """
716
    if (self.START_LINE_LENGTH_MAX is not None and
717
        length > self.START_LINE_LENGTH_MAX):
718
      raise HttpError("Start line longer than %d chars" %
719
                       self.START_LINE_LENGTH_MAX)
720

    
721
  def _CheckHeaderLength(self, length):
722
    """Limits the header buffer size.
723

724
    @type length: int
725
    @param length: Buffer size
726

727
    """
728
    if (self.HEADER_LENGTH_MAX is not None and
729
        length > self.HEADER_LENGTH_MAX):
730
      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
731

    
732
  def ParseStartLine(self, start_line):
733
    """Parses the start line of a message.
734

735
    Must be overriden by subclass.
736

737
    @type start_line: string
738
    @param start_line: Start line string
739

740
    """
741
    raise NotImplementedError()
742

    
743
  def _WillPeerCloseConnection(self):
744
    """Evaluate whether peer will close the connection.
745

746
    @rtype: bool
747
    @return: Whether peer will close the connection
748

749
    """
750
    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
751
    # for the sender to signal that the connection will be closed after
752
    # completion of the response. For example,
753
    #
754
    #        Connection: close
755
    #
756
    # in either the request or the response header fields indicates that the
757
    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
758
    # current request/response is complete."
759

    
760
    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
761
    if hdr_connection:
762
      hdr_connection = hdr_connection.lower()
763

    
764
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
765
    if self.msg.start_line.version == HTTP_1_1:
766
      return (hdr_connection and "close" in hdr_connection)
767

    
768
    # Some HTTP/1.0 implementations have support for persistent connections,
769
    # using rules different than HTTP/1.1.
770

    
771
    # For older HTTP, Keep-Alive indicates persistent connection.
772
    if self.msg.headers.get(HTTP_KEEP_ALIVE):
773
      return False
774

    
775
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
776
    # supposed to be sent by the client.
777
    if hdr_connection and "keep-alive" in hdr_connection:
778
      return False
779

    
780
    return True
781

    
782
  def _ParseHeaders(self):
783
    """Parses the headers.
784

785
    This function also adjusts internal variables based on header values.
786

787
    RFC2616, section 4.3: "The presence of a message-body in a request is
788
    signaled by the inclusion of a Content-Length or Transfer-Encoding header
789
    field in the request's message-headers."
790

791
    """
792
    # Parse headers
793
    self.header_buffer.seek(0, 0)
794
    self.msg.headers = mimetools.Message(self.header_buffer, 0)
795

    
796
    self.peer_will_close = self._WillPeerCloseConnection()
797

    
798
    # Do we have a Content-Length header?
799
    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
800
    if hdr_content_length:
801
      try:
802
        self.content_length = int(hdr_content_length)
803
      except ValueError:
804
        self.content_length = None
805
      if self.content_length is not None and self.content_length < 0:
806
        self.content_length = None
807

    
808
    # if the connection remains open and a content-length was not provided,
809
    # then assume that the connection WILL close.
810
    if self.content_length is None:
811
      self.peer_will_close = True