Statistics
| Branch: | Tag: | Revision:

root / lib / http / __init__.py @ 1122eb25

History | View | Annotate | Download (27.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP module.
22

23
"""
24

    
25
import logging
26
import mimetools
27
import OpenSSL
28
import select
29
import socket
30
import errno
31

    
32
from cStringIO import StringIO
33

    
34
from ganeti import constants
35
from ganeti import serializer
36
from ganeti import utils
37

    
38

    
39
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
40

    
41
HTTP_OK = 200
42
HTTP_NO_CONTENT = 204
43
HTTP_NOT_MODIFIED = 304
44

    
45
HTTP_0_9 = "HTTP/0.9"
46
HTTP_1_0 = "HTTP/1.0"
47
HTTP_1_1 = "HTTP/1.1"
48

    
49
HTTP_GET = "GET"
50
HTTP_HEAD = "HEAD"
51
HTTP_POST = "POST"
52
HTTP_PUT = "PUT"
53
HTTP_DELETE = "DELETE"
54

    
55
HTTP_ETAG = "ETag"
56
HTTP_HOST = "Host"
57
HTTP_SERVER = "Server"
58
HTTP_DATE = "Date"
59
HTTP_USER_AGENT = "User-Agent"
60
HTTP_CONTENT_TYPE = "Content-Type"
61
HTTP_CONTENT_LENGTH = "Content-Length"
62
HTTP_CONNECTION = "Connection"
63
HTTP_KEEP_ALIVE = "Keep-Alive"
64
HTTP_WWW_AUTHENTICATE = "WWW-Authenticate"
65
HTTP_AUTHORIZATION = "Authorization"
66
HTTP_AUTHENTICATION_INFO = "Authentication-Info"
67
HTTP_ALLOW = "Allow"
68

    
69
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
70

    
71
# Socket operations
72
(SOCKOP_SEND,
73
 SOCKOP_RECV,
74
 SOCKOP_SHUTDOWN,
75
 SOCKOP_HANDSHAKE) = range(4)
76

    
77
# send/receive quantum
78
SOCK_BUF_SIZE = 32768
79

    
80

    
81
class HttpError(Exception):
82
  """Internal exception for HTTP errors.
83

84
  This should only be used for internal error reporting.
85

86
  """
87

    
88

    
89
class HttpConnectionClosed(Exception):
90
  """Internal exception for a closed connection.
91

92
  This should only be used for internal error reporting. Only use
93
  it if there's no other way to report this condition.
94

95
  """
96

    
97

    
98
class HttpSessionHandshakeUnexpectedEOF(HttpError):
99
  """Internal exception for errors during SSL handshake.
100

101
  This should only be used for internal error reporting.
102

103
  """
104

    
105

    
106
class HttpSocketTimeout(Exception):
107
  """Internal exception for socket timeouts.
108

109
  This should only be used for internal error reporting.
110

111
  """
112

    
113

    
114
class HttpException(Exception):
115
  code = None
116
  message = None
117

    
118
  def __init__(self, message=None, headers=None):
119
    Exception.__init__(self)
120
    self.message = message
121
    self.headers = headers
122

    
123

    
124
class HttpBadRequest(HttpException):
125
  """400 Bad Request
126

127
  RFC2616, 10.4.1: The request could not be understood by the server
128
  due to malformed syntax. The client SHOULD NOT repeat the request
129
  without modifications.
130

131
  """
132
  code = 400
133

    
134

    
135
class HttpUnauthorized(HttpException):
136
  """401 Unauthorized
137

138
  RFC2616, section 10.4.2: The request requires user
139
  authentication. The response MUST include a WWW-Authenticate header
140
  field (section 14.47) containing a challenge applicable to the
141
  requested resource.
142

143
  """
144
  code = 401
145

    
146

    
147
class HttpForbidden(HttpException):
148
  """403 Forbidden
149

150
  RFC2616, 10.4.4: The server understood the request, but is refusing
151
  to fulfill it.  Authorization will not help and the request SHOULD
152
  NOT be repeated.
153

154
  """
155
  code = 403
156

    
157

    
158
class HttpNotFound(HttpException):
159
  """404 Not Found
160

161
  RFC2616, 10.4.5: The server has not found anything matching the
162
  Request-URI.  No indication is given of whether the condition is
163
  temporary or permanent.
164

165
  """
166
  code = 404
167

    
168

    
169
class HttpMethodNotAllowed(HttpException):
170
  """405 Method Not Allowed
171

172
  RFC2616, 10.4.6: The method specified in the Request-Line is not
173
  allowed for the resource identified by the Request-URI. The response
174
  MUST include an Allow header containing a list of valid methods for
175
  the requested resource.
176

177
  """
178
  code = 405
179

    
180

    
181
class HttpRequestTimeout(HttpException):
182
  """408 Request Timeout
183

184
  RFC2616, 10.4.9: The client did not produce a request within the
185
  time that the server was prepared to wait. The client MAY repeat the
186
  request without modifications at any later time.
187

188
  """
189
  code = 408
190

    
191

    
192
class HttpConflict(HttpException):
193
  """409 Conflict
194

195
  RFC2616, 10.4.10: The request could not be completed due to a
196
  conflict with the current state of the resource. This code is only
197
  allowed in situations where it is expected that the user might be
198
  able to resolve the conflict and resubmit the request.
199

200
  """
201
  code = 409
202

    
203

    
204
class HttpGone(HttpException):
205
  """410 Gone
206

207
  RFC2616, 10.4.11: The requested resource is no longer available at
208
  the server and no forwarding address is known. This condition is
209
  expected to be considered permanent.
210

211
  """
212
  code = 410
213

    
214

    
215
class HttpLengthRequired(HttpException):
216
  """411 Length Required
217

218
  RFC2616, 10.4.12: The server refuses to accept the request without a
219
  defined Content-Length. The client MAY repeat the request if it adds
220
  a valid Content-Length header field containing the length of the
221
  message-body in the request message.
222

223
  """
224
  code = 411
225

    
226

    
227
class HttpPreconditionFailed(HttpException):
228
  """412 Precondition Failed
229

230
  RFC2616, 10.4.13: The precondition given in one or more of the
231
  request-header fields evaluated to false when it was tested on the
232
  server.
233

234
  """
235
  code = 412
236

    
237

    
238
class HttpInternalServerError(HttpException):
239
  """500 Internal Server Error
240

241
  RFC2616, 10.5.1: The server encountered an unexpected condition
242
  which prevented it from fulfilling the request.
243

244
  """
245
  code = 500
246

    
247

    
248
class HttpNotImplemented(HttpException):
249
  """501 Not Implemented
250

251
  RFC2616, 10.5.2: The server does not support the functionality
252
  required to fulfill the request.
253

254
  """
255
  code = 501
256

    
257

    
258
class HttpBadGateway(HttpException):
259
  """502 Bad Gateway
260

261
  RFC2616, 10.5.3: The server, while acting as a gateway or proxy,
262
  received an invalid response from the upstream server it accessed in
263
  attempting to fulfill the request.
264

265
  """
266
  code = 502
267

    
268

    
269
class HttpServiceUnavailable(HttpException):
270
  """503 Service Unavailable
271

272
  RFC2616, 10.5.4: The server is currently unable to handle the
273
  request due to a temporary overloading or maintenance of the server.
274

275
  """
276
  code = 503
277

    
278

    
279
class HttpGatewayTimeout(HttpException):
280
  """504 Gateway Timeout
281

282
  RFC2616, 10.5.5: The server, while acting as a gateway or proxy, did
283
  not receive a timely response from the upstream server specified by
284
  the URI (e.g.  HTTP, FTP, LDAP) or some other auxiliary server
285
  (e.g. DNS) it needed to access in attempting to complete the
286
  request.
287

288
  """
289
  code = 504
290

    
291

    
292
class HttpVersionNotSupported(HttpException):
293
  """505 HTTP Version Not Supported
294

295
  RFC2616, 10.5.6: The server does not support, or refuses to support,
296
  the HTTP protocol version that was used in the request message.
297

298
  """
299
  code = 505
300

    
301

    
302
class HttpJsonConverter:
303
  CONTENT_TYPE = "application/json"
304

    
305
  def Encode(self, data):
306
    return serializer.DumpJson(data)
307

    
308
  def Decode(self, data):
309
    return serializer.LoadJson(data)
310

    
311

    
312
def WaitForSocketCondition(sock, event, timeout):
313
  """Waits for a condition to occur on the socket.
314

315
  @type sock: socket
316
  @param sock: Wait for events on this socket
317
  @type event: int
318
  @param event: ORed condition (see select module)
319
  @type timeout: float or None
320
  @param timeout: Timeout in seconds
321
  @rtype: int or None
322
  @return: None for timeout, otherwise occured conditions
323

324
  """
325
  check = (event | select.POLLPRI |
326
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
327

    
328
  if timeout is not None:
329
    # Poller object expects milliseconds
330
    timeout *= 1000
331

    
332
  poller = select.poll()
333
  poller.register(sock, event)
334
  try:
335
    while True:
336
      # TODO: If the main thread receives a signal and we have no timeout, we
337
      # could wait forever. This should check a global "quit" flag or
338
      # something every so often.
339
      io_events = poller.poll(timeout)
340
      if not io_events:
341
        # Timeout
342
        return None
343
      for (_, evcond) in io_events:
344
        if evcond & check:
345
          return evcond
346
  finally:
347
    poller.unregister(sock)
348

    
349

    
350
def SocketOperation(sock, op, arg1, timeout):
351
  """Wrapper around socket functions.
352

353
  This function abstracts error handling for socket operations, especially
354
  for the complicated interaction with OpenSSL.
355

356
  @type sock: socket
357
  @param sock: Socket for the operation
358
  @type op: int
359
  @param op: Operation to execute (SOCKOP_* constants)
360
  @type arg1: any
361
  @param arg1: Parameter for function (if needed)
362
  @type timeout: None or float
363
  @param timeout: Timeout in seconds or None
364
  @return: Return value of socket function
365

366
  """
367
  # TODO: event_poll/event_check/override
368
  if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
369
    event_poll = select.POLLOUT
370

    
371
  elif op == SOCKOP_RECV:
372
    event_poll = select.POLLIN
373

    
374
  elif op == SOCKOP_SHUTDOWN:
375
    event_poll = None
376

    
377
    # The timeout is only used when OpenSSL requests polling for a condition.
378
    # It is not advisable to have no timeout for shutdown.
379
    assert timeout
380

    
381
  else:
382
    raise AssertionError("Invalid socket operation")
383

    
384
  # Handshake is only supported by SSL sockets
385
  if (op == SOCKOP_HANDSHAKE and
386
      not isinstance(sock, OpenSSL.SSL.ConnectionType)):
387
    return
388

    
389
  # No override by default
390
  event_override = 0
391

    
392
  while True:
393
    # Poll only for certain operations and when asked for by an override
394
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV, SOCKOP_HANDSHAKE):
395
      if event_override:
396
        wait_for_event = event_override
397
      else:
398
        wait_for_event = event_poll
399

    
400
      event = WaitForSocketCondition(sock, wait_for_event, timeout)
401
      if event is None:
402
        raise HttpSocketTimeout()
403

    
404
      if (op == SOCKOP_RECV and
405
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
406
        return ""
407

    
408
      if not event & wait_for_event:
409
        continue
410

    
411
    # Reset override
412
    event_override = 0
413

    
414
    try:
415
      try:
416
        if op == SOCKOP_SEND:
417
          return sock.send(arg1)
418

    
419
        elif op == SOCKOP_RECV:
420
          return sock.recv(arg1)
421

    
422
        elif op == SOCKOP_SHUTDOWN:
423
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
424
            # PyOpenSSL's shutdown() doesn't take arguments
425
            return sock.shutdown()
426
          else:
427
            return sock.shutdown(arg1)
428

    
429
        elif op == SOCKOP_HANDSHAKE:
430
          return sock.do_handshake()
431

    
432
      except OpenSSL.SSL.WantWriteError:
433
        # OpenSSL wants to write, poll for POLLOUT
434
        event_override = select.POLLOUT
435
        continue
436

    
437
      except OpenSSL.SSL.WantReadError:
438
        # OpenSSL wants to read, poll for POLLIN
439
        event_override = select.POLLIN | select.POLLPRI
440
        continue
441

    
442
      except OpenSSL.SSL.WantX509LookupError:
443
        continue
444

    
445
      except OpenSSL.SSL.ZeroReturnError, err:
446
        # SSL Connection has been closed. In SSL 3.0 and TLS 1.0, this only
447
        # occurs if a closure alert has occurred in the protocol, i.e. the
448
        # connection has been closed cleanly. Note that this does not
449
        # necessarily mean that the transport layer (e.g. a socket) has been
450
        # closed.
451
        if op == SOCKOP_SEND:
452
          # Can happen during a renegotiation
453
          raise HttpConnectionClosed(err.args)
454
        elif op == SOCKOP_RECV:
455
          return ""
456

    
457
        # SSL_shutdown shouldn't return SSL_ERROR_ZERO_RETURN
458
        raise socket.error(err.args)
459

    
460
      except OpenSSL.SSL.SysCallError, err:
461
        if op == SOCKOP_SEND:
462
          # arg1 is the data when writing
463
          if err.args and err.args[0] == -1 and arg1 == "":
464
            # errors when writing empty strings are expected
465
            # and can be ignored
466
            return 0
467

    
468
        if err.args == (-1, _SSL_UNEXPECTED_EOF):
469
          if op == SOCKOP_RECV:
470
            return ""
471
          elif op == SOCKOP_HANDSHAKE:
472
            # Can happen if peer disconnects directly after the connection is
473
            # opened.
474
            raise HttpSessionHandshakeUnexpectedEOF(err.args)
475

    
476
        raise socket.error(err.args)
477

    
478
      except OpenSSL.SSL.Error, err:
479
        raise socket.error(err.args)
480

    
481
    except socket.error, err:
482
      if err.args and err.args[0] == errno.EAGAIN:
483
        # Ignore EAGAIN
484
        continue
485

    
486
      raise
487

    
488

    
489
def ShutdownConnection(sock, close_timeout, write_timeout, msgreader, force):
490
  """Closes the connection.
491

492
  @type sock: socket
493
  @param sock: Socket to be shut down
494
  @type close_timeout: float
495
  @param close_timeout: How long to wait for the peer to close
496
      the connection
497
  @type write_timeout: float
498
  @param write_timeout: Write timeout for shutdown
499
  @type msgreader: http.HttpMessageReader
500
  @param msgreader: Request message reader, used to determine whether
501
      peer should close connection
502
  @type force: bool
503
  @param force: Whether to forcibly close the connection without
504
      waiting for peer
505

506
  """
507
  #print msgreader.peer_will_close, force
508
  if msgreader and msgreader.peer_will_close and not force:
509
    # Wait for peer to close
510
    try:
511
      # Check whether it's actually closed
512
      if not SocketOperation(sock, SOCKOP_RECV, 1, close_timeout):
513
        return
514
    except (socket.error, HttpError, HttpSocketTimeout):
515
      # Ignore errors at this stage
516
      pass
517

    
518
  # Close the connection from our side
519
  try:
520
    # We don't care about the return value, see NOTES in SSL_shutdown(3).
521
    SocketOperation(sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
522
                    write_timeout)
523
  except HttpSocketTimeout:
524
    raise HttpError("Timeout while shutting down connection")
525
  except socket.error, err:
526
    # Ignore ENOTCONN
527
    if not (err.args and err.args[0] == errno.ENOTCONN):
528
      raise HttpError("Error while shutting down connection: %s" % err)
529

    
530

    
531
def Handshake(sock, write_timeout):
532
  """Shakes peer's hands.
533

534
  @type sock: socket
535
  @param sock: Socket to be shut down
536
  @type write_timeout: float
537
  @param write_timeout: Write timeout for handshake
538

539
  """
540
  try:
541
    return SocketOperation(sock, SOCKOP_HANDSHAKE, None, write_timeout)
542
  except HttpSocketTimeout:
543
    raise HttpError("Timeout during SSL handshake")
544
  except socket.error, err:
545
    raise HttpError("Error in SSL handshake: %s" % err)
546

    
547

    
548
def InitSsl():
549
  """Initializes the SSL infrastructure.
550

551
  This function is idempotent.
552

553
  """
554
  if not OpenSSL.rand.status():
555
    raise EnvironmentError("OpenSSL could not collect enough entropy"
556
                           " for the PRNG")
557

    
558
  # TODO: Maybe add some additional seeding for OpenSSL's PRNG
559

    
560

    
561
class HttpSslParams(object):
562
  """Data class for SSL key and certificate.
563

564
  """
565
  def __init__(self, ssl_key_path, ssl_cert_path):
566
    """Initializes this class.
567

568
    @type ssl_key_path: string
569
    @param ssl_key_path: Path to file containing SSL key in PEM format
570
    @type ssl_cert_path: string
571
    @param ssl_cert_path: Path to file containing SSL certificate
572
        in PEM format
573

574
    """
575
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
576
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
577

    
578
  def GetKey(self):
579
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
580
                                          self.ssl_key_pem)
581

    
582
  def GetCertificate(self):
583
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
584
                                           self.ssl_cert_pem)
585

    
586

    
587
class HttpBase(object):
588
  """Base class for HTTP server and client.
589

590
  """
591
  def __init__(self):
592
    self.using_ssl = None
593
    self._ssl_params = None
594
    self._ssl_key = None
595
    self._ssl_cert = None
596

    
597
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
598
    """Creates a TCP socket and initializes SSL if needed.
599

600
    @type ssl_params: HttpSslParams
601
    @param ssl_params: SSL key and certificate
602
    @type ssl_verify_peer: bool
603
    @param ssl_verify_peer: Whether to require client certificate
604
        and compare it with our certificate
605

606
    """
607
    self._ssl_params = ssl_params
608

    
609
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
610

    
611
    # Should we enable SSL?
612
    self.using_ssl = ssl_params is not None
613

    
614
    if not self.using_ssl:
615
      return sock
616

    
617
    self._ssl_key = ssl_params.GetKey()
618
    self._ssl_cert = ssl_params.GetCertificate()
619

    
620
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
621
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
622

    
623
    ctx.use_privatekey(self._ssl_key)
624
    ctx.use_certificate(self._ssl_cert)
625
    ctx.check_privatekey()
626

    
627
    if ssl_verify_peer:
628
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
629
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
630
                     self._SSLVerifyCallback)
631

    
632
    return OpenSSL.SSL.Connection(ctx, sock)
633

    
634
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
635
    """Verify the certificate provided by the peer
636

637
    We only compare fingerprints. The client must use the same certificate as
638
    we do on our side.
639

640
    """
641
    assert self._ssl_params, "SSL not initialized"
642

    
643
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
644
            self._ssl_cert.digest("md5") == cert.digest("md5"))
645

    
646

    
647
class HttpMessage(object):
648
  """Data structure for HTTP message.
649

650
  """
651
  def __init__(self):
652
    self.start_line = None
653
    self.headers = None
654
    self.body = None
655
    self.decoded_body = None
656

    
657

    
658
class HttpClientToServerStartLine(object):
659
  """Data structure for HTTP request start line.
660

661
  """
662
  def __init__(self, method, path, version):
663
    self.method = method
664
    self.path = path
665
    self.version = version
666

    
667
  def __str__(self):
668
    return "%s %s %s" % (self.method, self.path, self.version)
669

    
670

    
671
class HttpServerToClientStartLine(object):
672
  """Data structure for HTTP response start line.
673

674
  """
675
  def __init__(self, version, code, reason):
676
    self.version = version
677
    self.code = code
678
    self.reason = reason
679

    
680
  def __str__(self):
681
    return "%s %s %s" % (self.version, self.code, self.reason)
682

    
683

    
684
class HttpMessageWriter(object):
685
  """Writes an HTTP message to a socket.
686

687
  """
688
  def __init__(self, sock, msg, write_timeout):
689
    """Initializes this class and writes an HTTP message to a socket.
690

691
    @type sock: socket
692
    @param sock: Socket to be written to
693
    @type msg: http.HttpMessage
694
    @param msg: HTTP message to be written
695
    @type write_timeout: float
696
    @param write_timeout: Write timeout for socket
697

698
    """
699
    self._msg = msg
700

    
701
    self._PrepareMessage()
702

    
703
    buf = self._FormatMessage()
704

    
705
    pos = 0
706
    end = len(buf)
707
    while pos < end:
708
      # Send only SOCK_BUF_SIZE bytes at a time
709
      data = buf[pos:(pos + SOCK_BUF_SIZE)]
710

    
711
      sent = SocketOperation(sock, SOCKOP_SEND, data, write_timeout)
712

    
713
      # Remove sent bytes
714
      pos += sent
715

    
716
    assert pos == end, "Message wasn't sent completely"
717

    
718
  def _PrepareMessage(self):
719
    """Prepares the HTTP message by setting mandatory headers.
720

721
    """
722
    # RFC2616, section 4.3: "The presence of a message-body in a request is
723
    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
724
    # field in the request's message-headers."
725
    if self._msg.body:
726
      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
727

    
728
  def _FormatMessage(self):
729
    """Serializes the HTTP message into a string.
730

731
    """
732
    buf = StringIO()
733

    
734
    # Add start line
735
    buf.write(str(self._msg.start_line))
736
    buf.write("\r\n")
737

    
738
    # Add headers
739
    if self._msg.start_line.version != HTTP_0_9:
740
      for name, value in self._msg.headers.iteritems():
741
        buf.write("%s: %s\r\n" % (name, value))
742

    
743
    buf.write("\r\n")
744

    
745
    # Add message body if needed
746
    if self.HasMessageBody():
747
      buf.write(self._msg.body)
748

    
749
    elif self._msg.body:
750
      logging.warning("Ignoring message body")
751

    
752
    return buf.getvalue()
753

    
754
  def HasMessageBody(self):
755
    """Checks whether the HTTP message contains a body.
756

757
    Can be overridden by subclasses.
758

759
    """
760
    return bool(self._msg.body)
761

    
762

    
763
class HttpMessageReader(object):
764
  """Reads HTTP message from socket.
765

766
  """
767
  # Length limits
768
  START_LINE_LENGTH_MAX = None
769
  HEADER_LENGTH_MAX = None
770

    
771
  # Parser state machine
772
  PS_START_LINE = "start-line"
773
  PS_HEADERS = "headers"
774
  PS_BODY = "entity-body"
775
  PS_COMPLETE = "complete"
776

    
777
  def __init__(self, sock, msg, read_timeout):
778
    """Reads an HTTP message from a socket.
779

780
    @type sock: socket
781
    @param sock: Socket to be read from
782
    @type msg: http.HttpMessage
783
    @param msg: Object for the read message
784
    @type read_timeout: float
785
    @param read_timeout: Read timeout for socket
786

787
    """
788
    self.sock = sock
789
    self.msg = msg
790

    
791
    self.start_line_buffer = None
792
    self.header_buffer = StringIO()
793
    self.body_buffer = StringIO()
794
    self.parser_status = self.PS_START_LINE
795
    self.content_length = None
796
    self.peer_will_close = None
797

    
798
    buf = ""
799
    eof = False
800
    while self.parser_status != self.PS_COMPLETE:
801
      # TODO: Don't read more than necessary (Content-Length), otherwise
802
      # data might be lost and/or an error could occur
803
      data = SocketOperation(sock, SOCKOP_RECV, SOCK_BUF_SIZE, read_timeout)
804

    
805
      if data:
806
        buf += data
807
      else:
808
        eof = True
809

    
810
      # Do some parsing and error checking while more data arrives
811
      buf = self._ContinueParsing(buf, eof)
812

    
813
      # Must be done only after the buffer has been evaluated
814
      # TODO: Connection-length < len(data read) and connection closed
815
      if (eof and
816
          self.parser_status in (self.PS_START_LINE,
817
                                 self.PS_HEADERS)):
818
        raise HttpError("Connection closed prematurely")
819

    
820
    # Parse rest
821
    buf = self._ContinueParsing(buf, True)
822

    
823
    assert self.parser_status == self.PS_COMPLETE
824
    assert not buf, "Parser didn't read full response"
825

    
826
    msg.body = self.body_buffer.getvalue()
827

    
828
    # TODO: Content-type, error handling
829
    if msg.body:
830
      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
831
    else:
832
      msg.decoded_body = None
833

    
834
    if msg.decoded_body:
835
      logging.debug("Message body: %s", msg.decoded_body)
836

    
837
  def _ContinueParsing(self, buf, eof):
838
    """Main function for HTTP message state machine.
839

840
    @type buf: string
841
    @param buf: Receive buffer
842
    @type eof: bool
843
    @param eof: Whether we've reached EOF on the socket
844
    @rtype: string
845
    @return: Updated receive buffer
846

847
    """
848
    # TODO: Use offset instead of slicing when possible
849
    if self.parser_status == self.PS_START_LINE:
850
      # Expect start line
851
      while True:
852
        idx = buf.find("\r\n")
853

    
854
        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
855
        # ignore any empty line(s) received where a Request-Line is expected.
856
        # In other words, if the server is reading the protocol stream at the
857
        # beginning of a message and receives a CRLF first, it should ignore
858
        # the CRLF."
859
        if idx == 0:
860
          # TODO: Limit number of CRLFs/empty lines for safety?
861
          buf = buf[:2]
862
          continue
863

    
864
        if idx > 0:
865
          self.start_line_buffer = buf[:idx]
866

    
867
          self._CheckStartLineLength(len(self.start_line_buffer))
868

    
869
          # Remove status line, including CRLF
870
          buf = buf[idx + 2:]
871

    
872
          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
873

    
874
          self.parser_status = self.PS_HEADERS
875
        else:
876
          # Check whether incoming data is getting too large, otherwise we just
877
          # fill our read buffer.
878
          self._CheckStartLineLength(len(buf))
879

    
880
        break
881

    
882
    # TODO: Handle messages without headers
883
    if self.parser_status == self.PS_HEADERS:
884
      # Wait for header end
885
      idx = buf.find("\r\n\r\n")
886
      if idx >= 0:
887
        self.header_buffer.write(buf[:idx + 2])
888

    
889
        self._CheckHeaderLength(self.header_buffer.tell())
890

    
891
        # Remove headers, including CRLF
892
        buf = buf[idx + 4:]
893

    
894
        self._ParseHeaders()
895

    
896
        self.parser_status = self.PS_BODY
897
      else:
898
        # Check whether incoming data is getting too large, otherwise we just
899
        # fill our read buffer.
900
        self._CheckHeaderLength(len(buf))
901

    
902
    if self.parser_status == self.PS_BODY:
903
      # TODO: Implement max size for body_buffer
904
      self.body_buffer.write(buf)
905
      buf = ""
906

    
907
      # Check whether we've read everything
908
      #
909
      # RFC2616, section 4.4: "When a message-body is included with a message,
910
      # the transfer-length of that body is determined by one of the following
911
      # [...] 5. By the server closing the connection. (Closing the connection
912
      # cannot be used to indicate the end of a request body, since that would
913
      # leave no possibility for the server to send back a response.)"
914
      #
915
      # TODO: Error when buffer length > Content-Length header
916
      if (eof or
917
          self.content_length is None or
918
          (self.content_length is not None and
919
           self.body_buffer.tell() >= self.content_length)):
920
        self.parser_status = self.PS_COMPLETE
921

    
922
    return buf
923

    
924
  def _CheckStartLineLength(self, length):
925
    """Limits the start line buffer size.
926

927
    @type length: int
928
    @param length: Buffer size
929

930
    """
931
    if (self.START_LINE_LENGTH_MAX is not None and
932
        length > self.START_LINE_LENGTH_MAX):
933
      raise HttpError("Start line longer than %d chars" %
934
                       self.START_LINE_LENGTH_MAX)
935

    
936
  def _CheckHeaderLength(self, length):
937
    """Limits the header buffer size.
938

939
    @type length: int
940
    @param length: Buffer size
941

942
    """
943
    if (self.HEADER_LENGTH_MAX is not None and
944
        length > self.HEADER_LENGTH_MAX):
945
      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
946

    
947
  def ParseStartLine(self, start_line):
948
    """Parses the start line of a message.
949

950
    Must be overridden by subclass.
951

952
    @type start_line: string
953
    @param start_line: Start line string
954

955
    """
956
    raise NotImplementedError()
957

    
958
  def _WillPeerCloseConnection(self):
959
    """Evaluate whether peer will close the connection.
960

961
    @rtype: bool
962
    @return: Whether peer will close the connection
963

964
    """
965
    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
966
    # for the sender to signal that the connection will be closed after
967
    # completion of the response. For example,
968
    #
969
    #        Connection: close
970
    #
971
    # in either the request or the response header fields indicates that the
972
    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
973
    # current request/response is complete."
974

    
975
    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
976
    if hdr_connection:
977
      hdr_connection = hdr_connection.lower()
978

    
979
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
980
    if self.msg.start_line.version == HTTP_1_1:
981
      return (hdr_connection and "close" in hdr_connection)
982

    
983
    # Some HTTP/1.0 implementations have support for persistent connections,
984
    # using rules different than HTTP/1.1.
985

    
986
    # For older HTTP, Keep-Alive indicates persistent connection.
987
    if self.msg.headers.get(HTTP_KEEP_ALIVE):
988
      return False
989

    
990
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
991
    # supposed to be sent by the client.
992
    if hdr_connection and "keep-alive" in hdr_connection:
993
      return False
994

    
995
    return True
996

    
997
  def _ParseHeaders(self):
998
    """Parses the headers.
999

1000
    This function also adjusts internal variables based on header values.
1001

1002
    RFC2616, section 4.3: The presence of a message-body in a request is
1003
    signaled by the inclusion of a Content-Length or Transfer-Encoding header
1004
    field in the request's message-headers.
1005

1006
    """
1007
    # Parse headers
1008
    self.header_buffer.seek(0, 0)
1009
    self.msg.headers = mimetools.Message(self.header_buffer, 0)
1010

    
1011
    self.peer_will_close = self._WillPeerCloseConnection()
1012

    
1013
    # Do we have a Content-Length header?
1014
    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
1015
    if hdr_content_length:
1016
      try:
1017
        self.content_length = int(hdr_content_length)
1018
      except ValueError:
1019
        self.content_length = None
1020
      if self.content_length is not None and self.content_length < 0:
1021
        self.content_length = None
1022

    
1023
    # if the connection remains open and a content-length was not provided,
1024
    # then assume that the connection WILL close.
1025
    if self.content_length is None:
1026
      self.peer_will_close = True