Statistics
| Branch: | Tag: | Revision:

root / lib / http / __init__.py @ 580558a3

History | View | Annotate | Download (28.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP module.
22

23
"""
24

    
25
import logging
26
import mimetools
27
import OpenSSL
28
import select
29
import socket
30
import errno
31

    
32
from cStringIO import StringIO
33

    
34
from ganeti import constants
35
from ganeti import serializer
36
from ganeti import utils
37

    
38

    
39
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
40

    
41
HTTP_OK = 200
42
HTTP_NO_CONTENT = 204
43
HTTP_NOT_MODIFIED = 304
44

    
45
HTTP_0_9 = "HTTP/0.9"
46
HTTP_1_0 = "HTTP/1.0"
47
HTTP_1_1 = "HTTP/1.1"
48

    
49
HTTP_GET = "GET"
50
HTTP_HEAD = "HEAD"
51
HTTP_POST = "POST"
52
HTTP_PUT = "PUT"
53
HTTP_DELETE = "DELETE"
54

    
55
HTTP_ETAG = "ETag"
56
HTTP_HOST = "Host"
57
HTTP_SERVER = "Server"
58
HTTP_DATE = "Date"
59
HTTP_USER_AGENT = "User-Agent"
60
HTTP_CONTENT_TYPE = "Content-Type"
61
HTTP_CONTENT_LENGTH = "Content-Length"
62
HTTP_CONNECTION = "Connection"
63
HTTP_KEEP_ALIVE = "Keep-Alive"
64
HTTP_WWW_AUTHENTICATE = "WWW-Authenticate"
65
HTTP_AUTHORIZATION = "Authorization"
66
HTTP_AUTHENTICATION_INFO = "Authentication-Info"
67
HTTP_ALLOW = "Allow"
68

    
69
HTTP_APP_OCTET_STREAM = "application/octet-stream"
70

    
71
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
72

    
73
# Socket operations
74
(SOCKOP_SEND,
75
 SOCKOP_RECV,
76
 SOCKOP_SHUTDOWN,
77
 SOCKOP_HANDSHAKE) = range(4)
78

    
79
# send/receive quantum
80
SOCK_BUF_SIZE = 32768
81

    
82

    
83
class HttpError(Exception):
84
  """Internal exception for HTTP errors.
85

86
  This should only be used for internal error reporting.
87

88
  """
89

    
90

    
91
class HttpConnectionClosed(Exception):
92
  """Internal exception for a closed connection.
93

94
  This should only be used for internal error reporting. Only use
95
  it if there's no other way to report this condition.
96

97
  """
98

    
99

    
100
class HttpSessionHandshakeUnexpectedEOF(HttpError):
101
  """Internal exception for errors during SSL handshake.
102

103
  This should only be used for internal error reporting.
104

105
  """
106

    
107

    
108
class HttpSocketTimeout(Exception):
109
  """Internal exception for socket timeouts.
110

111
  This should only be used for internal error reporting.
112

113
  """
114

    
115

    
116
class HttpException(Exception):
117
  code = None
118
  message = None
119

    
120
  def __init__(self, message=None, headers=None):
121
    Exception.__init__(self)
122
    self.message = message
123
    self.headers = headers
124

    
125

    
126
class HttpBadRequest(HttpException):
127
  """400 Bad Request
128

129
  RFC2616, 10.4.1: The request could not be understood by the server
130
  due to malformed syntax. The client SHOULD NOT repeat the request
131
  without modifications.
132

133
  """
134
  code = 400
135

    
136

    
137
class HttpUnauthorized(HttpException):
138
  """401 Unauthorized
139

140
  RFC2616, section 10.4.2: The request requires user
141
  authentication. The response MUST include a WWW-Authenticate header
142
  field (section 14.47) containing a challenge applicable to the
143
  requested resource.
144

145
  """
146
  code = 401
147

    
148

    
149
class HttpForbidden(HttpException):
150
  """403 Forbidden
151

152
  RFC2616, 10.4.4: The server understood the request, but is refusing
153
  to fulfill it.  Authorization will not help and the request SHOULD
154
  NOT be repeated.
155

156
  """
157
  code = 403
158

    
159

    
160
class HttpNotFound(HttpException):
161
  """404 Not Found
162

163
  RFC2616, 10.4.5: The server has not found anything matching the
164
  Request-URI.  No indication is given of whether the condition is
165
  temporary or permanent.
166

167
  """
168
  code = 404
169

    
170

    
171
class HttpMethodNotAllowed(HttpException):
172
  """405 Method Not Allowed
173

174
  RFC2616, 10.4.6: The method specified in the Request-Line is not
175
  allowed for the resource identified by the Request-URI. The response
176
  MUST include an Allow header containing a list of valid methods for
177
  the requested resource.
178

179
  """
180
  code = 405
181

    
182

    
183
class HttpNotAcceptable(HttpException):
184
  """406 Not Acceptable
185

186
  RFC2616, 10.4.7: The resource identified by the request is only capable of
187
  generating response entities which have content characteristics not
188
  acceptable according to the accept headers sent in the request.
189

190
  """
191
  code = 406
192

    
193

    
194
class HttpRequestTimeout(HttpException):
195
  """408 Request Timeout
196

197
  RFC2616, 10.4.9: The client did not produce a request within the
198
  time that the server was prepared to wait. The client MAY repeat the
199
  request without modifications at any later time.
200

201
  """
202
  code = 408
203

    
204

    
205
class HttpConflict(HttpException):
206
  """409 Conflict
207

208
  RFC2616, 10.4.10: The request could not be completed due to a
209
  conflict with the current state of the resource. This code is only
210
  allowed in situations where it is expected that the user might be
211
  able to resolve the conflict and resubmit the request.
212

213
  """
214
  code = 409
215

    
216

    
217
class HttpGone(HttpException):
218
  """410 Gone
219

220
  RFC2616, 10.4.11: The requested resource is no longer available at
221
  the server and no forwarding address is known. This condition is
222
  expected to be considered permanent.
223

224
  """
225
  code = 410
226

    
227

    
228
class HttpLengthRequired(HttpException):
229
  """411 Length Required
230

231
  RFC2616, 10.4.12: The server refuses to accept the request without a
232
  defined Content-Length. The client MAY repeat the request if it adds
233
  a valid Content-Length header field containing the length of the
234
  message-body in the request message.
235

236
  """
237
  code = 411
238

    
239

    
240
class HttpPreconditionFailed(HttpException):
241
  """412 Precondition Failed
242

243
  RFC2616, 10.4.13: The precondition given in one or more of the
244
  request-header fields evaluated to false when it was tested on the
245
  server.
246

247
  """
248
  code = 412
249

    
250

    
251
class HttpUnsupportedMediaType(HttpException):
252
  """415 Unsupported Media Type
253

254
  RFC2616, 10.4.16: The server is refusing to service the request because the
255
  entity of the request is in a format not supported by the requested resource
256
  for the requested method.
257

258
  """
259
  code = 415
260

    
261

    
262
class HttpInternalServerError(HttpException):
263
  """500 Internal Server Error
264

265
  RFC2616, 10.5.1: The server encountered an unexpected condition
266
  which prevented it from fulfilling the request.
267

268
  """
269
  code = 500
270

    
271

    
272
class HttpNotImplemented(HttpException):
273
  """501 Not Implemented
274

275
  RFC2616, 10.5.2: The server does not support the functionality
276
  required to fulfill the request.
277

278
  """
279
  code = 501
280

    
281

    
282
class HttpBadGateway(HttpException):
283
  """502 Bad Gateway
284

285
  RFC2616, 10.5.3: The server, while acting as a gateway or proxy,
286
  received an invalid response from the upstream server it accessed in
287
  attempting to fulfill the request.
288

289
  """
290
  code = 502
291

    
292

    
293
class HttpServiceUnavailable(HttpException):
294
  """503 Service Unavailable
295

296
  RFC2616, 10.5.4: The server is currently unable to handle the
297
  request due to a temporary overloading or maintenance of the server.
298

299
  """
300
  code = 503
301

    
302

    
303
class HttpGatewayTimeout(HttpException):
304
  """504 Gateway Timeout
305

306
  RFC2616, 10.5.5: The server, while acting as a gateway or proxy, did
307
  not receive a timely response from the upstream server specified by
308
  the URI (e.g.  HTTP, FTP, LDAP) or some other auxiliary server
309
  (e.g. DNS) it needed to access in attempting to complete the
310
  request.
311

312
  """
313
  code = 504
314

    
315

    
316
class HttpVersionNotSupported(HttpException):
317
  """505 HTTP Version Not Supported
318

319
  RFC2616, 10.5.6: The server does not support, or refuses to support,
320
  the HTTP protocol version that was used in the request message.
321

322
  """
323
  code = 505
324

    
325

    
326
class HttpJsonConverter: # pylint: disable-msg=W0232
327
  CONTENT_TYPE = "application/json"
328

    
329
  @staticmethod
330
  def Encode(data):
331
    return serializer.DumpJson(data)
332

    
333
  @staticmethod
334
  def Decode(data):
335
    return serializer.LoadJson(data)
336

    
337

    
338
def WaitForSocketCondition(sock, event, timeout):
339
  """Waits for a condition to occur on the socket.
340

341
  @type sock: socket
342
  @param sock: Wait for events on this socket
343
  @type event: int
344
  @param event: ORed condition (see select module)
345
  @type timeout: float or None
346
  @param timeout: Timeout in seconds
347
  @rtype: int or None
348
  @return: None for timeout, otherwise occured conditions
349

350
  """
351
  check = (event | select.POLLPRI |
352
           select.POLLNVAL | select.POLLHUP | select.POLLERR)
353

    
354
  if timeout is not None:
355
    # Poller object expects milliseconds
356
    timeout *= 1000
357

    
358
  poller = select.poll()
359
  poller.register(sock, event)
360
  try:
361
    while True:
362
      # TODO: If the main thread receives a signal and we have no timeout, we
363
      # could wait forever. This should check a global "quit" flag or
364
      # something every so often.
365
      io_events = poller.poll(timeout)
366
      if not io_events:
367
        # Timeout
368
        return None
369
      for (_, evcond) in io_events:
370
        if evcond & check:
371
          return evcond
372
  finally:
373
    poller.unregister(sock)
374

    
375

    
376
def SocketOperation(sock, op, arg1, timeout):
377
  """Wrapper around socket functions.
378

379
  This function abstracts error handling for socket operations, especially
380
  for the complicated interaction with OpenSSL.
381

382
  @type sock: socket
383
  @param sock: Socket for the operation
384
  @type op: int
385
  @param op: Operation to execute (SOCKOP_* constants)
386
  @type arg1: any
387
  @param arg1: Parameter for function (if needed)
388
  @type timeout: None or float
389
  @param timeout: Timeout in seconds or None
390
  @return: Return value of socket function
391

392
  """
393
  # TODO: event_poll/event_check/override
394
  if op in (SOCKOP_SEND, SOCKOP_HANDSHAKE):
395
    event_poll = select.POLLOUT
396

    
397
  elif op == SOCKOP_RECV:
398
    event_poll = select.POLLIN
399

    
400
  elif op == SOCKOP_SHUTDOWN:
401
    event_poll = None
402

    
403
    # The timeout is only used when OpenSSL requests polling for a condition.
404
    # It is not advisable to have no timeout for shutdown.
405
    assert timeout
406

    
407
  else:
408
    raise AssertionError("Invalid socket operation")
409

    
410
  # Handshake is only supported by SSL sockets
411
  if (op == SOCKOP_HANDSHAKE and
412
      not isinstance(sock, OpenSSL.SSL.ConnectionType)):
413
    return
414

    
415
  # No override by default
416
  event_override = 0
417

    
418
  while True:
419
    # Poll only for certain operations and when asked for by an override
420
    if event_override or op in (SOCKOP_SEND, SOCKOP_RECV, SOCKOP_HANDSHAKE):
421
      if event_override:
422
        wait_for_event = event_override
423
      else:
424
        wait_for_event = event_poll
425

    
426
      event = WaitForSocketCondition(sock, wait_for_event, timeout)
427
      if event is None:
428
        raise HttpSocketTimeout()
429

    
430
      if (op == SOCKOP_RECV and
431
          event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
432
        return ""
433

    
434
      if not event & wait_for_event:
435
        continue
436

    
437
    # Reset override
438
    event_override = 0
439

    
440
    try:
441
      try:
442
        if op == SOCKOP_SEND:
443
          return sock.send(arg1)
444

    
445
        elif op == SOCKOP_RECV:
446
          return sock.recv(arg1)
447

    
448
        elif op == SOCKOP_SHUTDOWN:
449
          if isinstance(sock, OpenSSL.SSL.ConnectionType):
450
            # PyOpenSSL's shutdown() doesn't take arguments
451
            return sock.shutdown()
452
          else:
453
            return sock.shutdown(arg1)
454

    
455
        elif op == SOCKOP_HANDSHAKE:
456
          return sock.do_handshake()
457

    
458
      except OpenSSL.SSL.WantWriteError:
459
        # OpenSSL wants to write, poll for POLLOUT
460
        event_override = select.POLLOUT
461
        continue
462

    
463
      except OpenSSL.SSL.WantReadError:
464
        # OpenSSL wants to read, poll for POLLIN
465
        event_override = select.POLLIN | select.POLLPRI
466
        continue
467

    
468
      except OpenSSL.SSL.WantX509LookupError:
469
        continue
470

    
471
      except OpenSSL.SSL.ZeroReturnError, err:
472
        # SSL Connection has been closed. In SSL 3.0 and TLS 1.0, this only
473
        # occurs if a closure alert has occurred in the protocol, i.e. the
474
        # connection has been closed cleanly. Note that this does not
475
        # necessarily mean that the transport layer (e.g. a socket) has been
476
        # closed.
477
        if op == SOCKOP_SEND:
478
          # Can happen during a renegotiation
479
          raise HttpConnectionClosed(err.args)
480
        elif op == SOCKOP_RECV:
481
          return ""
482

    
483
        # SSL_shutdown shouldn't return SSL_ERROR_ZERO_RETURN
484
        raise socket.error(err.args)
485

    
486
      except OpenSSL.SSL.SysCallError, err:
487
        if op == SOCKOP_SEND:
488
          # arg1 is the data when writing
489
          if err.args and err.args[0] == -1 and arg1 == "":
490
            # errors when writing empty strings are expected
491
            # and can be ignored
492
            return 0
493

    
494
        if err.args == (-1, _SSL_UNEXPECTED_EOF):
495
          if op == SOCKOP_RECV:
496
            return ""
497
          elif op == SOCKOP_HANDSHAKE:
498
            # Can happen if peer disconnects directly after the connection is
499
            # opened.
500
            raise HttpSessionHandshakeUnexpectedEOF(err.args)
501

    
502
        raise socket.error(err.args)
503

    
504
      except OpenSSL.SSL.Error, err:
505
        raise socket.error(err.args)
506

    
507
    except socket.error, err:
508
      if err.args and err.args[0] == errno.EAGAIN:
509
        # Ignore EAGAIN
510
        continue
511

    
512
      raise
513

    
514

    
515
def ShutdownConnection(sock, close_timeout, write_timeout, msgreader, force):
516
  """Closes the connection.
517

518
  @type sock: socket
519
  @param sock: Socket to be shut down
520
  @type close_timeout: float
521
  @param close_timeout: How long to wait for the peer to close
522
      the connection
523
  @type write_timeout: float
524
  @param write_timeout: Write timeout for shutdown
525
  @type msgreader: http.HttpMessageReader
526
  @param msgreader: Request message reader, used to determine whether
527
      peer should close connection
528
  @type force: bool
529
  @param force: Whether to forcibly close the connection without
530
      waiting for peer
531

532
  """
533
  #print msgreader.peer_will_close, force
534
  if msgreader and msgreader.peer_will_close and not force:
535
    # Wait for peer to close
536
    try:
537
      # Check whether it's actually closed
538
      if not SocketOperation(sock, SOCKOP_RECV, 1, close_timeout):
539
        return
540
    except (socket.error, HttpError, HttpSocketTimeout):
541
      # Ignore errors at this stage
542
      pass
543

    
544
  # Close the connection from our side
545
  try:
546
    # We don't care about the return value, see NOTES in SSL_shutdown(3).
547
    SocketOperation(sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
548
                    write_timeout)
549
  except HttpSocketTimeout:
550
    raise HttpError("Timeout while shutting down connection")
551
  except socket.error, err:
552
    # Ignore ENOTCONN
553
    if not (err.args and err.args[0] == errno.ENOTCONN):
554
      raise HttpError("Error while shutting down connection: %s" % err)
555

    
556

    
557
def Handshake(sock, write_timeout):
558
  """Shakes peer's hands.
559

560
  @type sock: socket
561
  @param sock: Socket to be shut down
562
  @type write_timeout: float
563
  @param write_timeout: Write timeout for handshake
564

565
  """
566
  try:
567
    return SocketOperation(sock, SOCKOP_HANDSHAKE, None, write_timeout)
568
  except HttpSocketTimeout:
569
    raise HttpError("Timeout during SSL handshake")
570
  except socket.error, err:
571
    raise HttpError("Error in SSL handshake: %s" % err)
572

    
573

    
574
def InitSsl():
575
  """Initializes the SSL infrastructure.
576

577
  This function is idempotent.
578

579
  """
580
  if not OpenSSL.rand.status():
581
    raise EnvironmentError("OpenSSL could not collect enough entropy"
582
                           " for the PRNG")
583

    
584
  # TODO: Maybe add some additional seeding for OpenSSL's PRNG
585

    
586

    
587
class HttpSslParams(object):
588
  """Data class for SSL key and certificate.
589

590
  """
591
  def __init__(self, ssl_key_path, ssl_cert_path):
592
    """Initializes this class.
593

594
    @type ssl_key_path: string
595
    @param ssl_key_path: Path to file containing SSL key in PEM format
596
    @type ssl_cert_path: string
597
    @param ssl_cert_path: Path to file containing SSL certificate
598
        in PEM format
599

600
    """
601
    self.ssl_key_pem = utils.ReadFile(ssl_key_path)
602
    self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
603

    
604
  def GetKey(self):
605
    return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
606
                                          self.ssl_key_pem)
607

    
608
  def GetCertificate(self):
609
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
610
                                           self.ssl_cert_pem)
611

    
612

    
613
class HttpBase(object):
614
  """Base class for HTTP server and client.
615

616
  """
617
  def __init__(self):
618
    self.using_ssl = None
619
    self._ssl_params = None
620
    self._ssl_key = None
621
    self._ssl_cert = None
622

    
623
  def _CreateSocket(self, ssl_params, ssl_verify_peer):
624
    """Creates a TCP socket and initializes SSL if needed.
625

626
    @type ssl_params: HttpSslParams
627
    @param ssl_params: SSL key and certificate
628
    @type ssl_verify_peer: bool
629
    @param ssl_verify_peer: Whether to require client certificate
630
        and compare it with our certificate
631

632
    """
633
    self._ssl_params = ssl_params
634

    
635
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
636

    
637
    # Should we enable SSL?
638
    self.using_ssl = ssl_params is not None
639

    
640
    if not self.using_ssl:
641
      return sock
642

    
643
    self._ssl_key = ssl_params.GetKey()
644
    self._ssl_cert = ssl_params.GetCertificate()
645

    
646
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
647
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
648

    
649
    ctx.use_privatekey(self._ssl_key)
650
    ctx.use_certificate(self._ssl_cert)
651
    ctx.check_privatekey()
652

    
653
    if ssl_verify_peer:
654
      ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
655
                     OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
656
                     self._SSLVerifyCallback)
657

    
658
    return OpenSSL.SSL.Connection(ctx, sock)
659

    
660
  def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
661
    """Verify the certificate provided by the peer
662

663
    We only compare fingerprints. The client must use the same certificate as
664
    we do on our side.
665

666
    """
667
    # some parameters are unused, but this is the API
668
    # pylint: disable-msg=W0613
669
    assert self._ssl_params, "SSL not initialized"
670

    
671
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
672
            self._ssl_cert.digest("md5") == cert.digest("md5"))
673

    
674

    
675
class HttpMessage(object):
676
  """Data structure for HTTP message.
677

678
  """
679
  def __init__(self):
680
    self.start_line = None
681
    self.headers = None
682
    self.body = None
683
    self.decoded_body = None
684

    
685

    
686
class HttpClientToServerStartLine(object):
687
  """Data structure for HTTP request start line.
688

689
  """
690
  def __init__(self, method, path, version):
691
    self.method = method
692
    self.path = path
693
    self.version = version
694

    
695
  def __str__(self):
696
    return "%s %s %s" % (self.method, self.path, self.version)
697

    
698

    
699
class HttpServerToClientStartLine(object):
700
  """Data structure for HTTP response start line.
701

702
  """
703
  def __init__(self, version, code, reason):
704
    self.version = version
705
    self.code = code
706
    self.reason = reason
707

    
708
  def __str__(self):
709
    return "%s %s %s" % (self.version, self.code, self.reason)
710

    
711

    
712
class HttpMessageWriter(object):
713
  """Writes an HTTP message to a socket.
714

715
  """
716
  def __init__(self, sock, msg, write_timeout):
717
    """Initializes this class and writes an HTTP message to a socket.
718

719
    @type sock: socket
720
    @param sock: Socket to be written to
721
    @type msg: http.HttpMessage
722
    @param msg: HTTP message to be written
723
    @type write_timeout: float
724
    @param write_timeout: Write timeout for socket
725

726
    """
727
    self._msg = msg
728

    
729
    self._PrepareMessage()
730

    
731
    buf = self._FormatMessage()
732

    
733
    pos = 0
734
    end = len(buf)
735
    while pos < end:
736
      # Send only SOCK_BUF_SIZE bytes at a time
737
      data = buf[pos:(pos + SOCK_BUF_SIZE)]
738

    
739
      sent = SocketOperation(sock, SOCKOP_SEND, data, write_timeout)
740

    
741
      # Remove sent bytes
742
      pos += sent
743

    
744
    assert pos == end, "Message wasn't sent completely"
745

    
746
  def _PrepareMessage(self):
747
    """Prepares the HTTP message by setting mandatory headers.
748

749
    """
750
    # RFC2616, section 4.3: "The presence of a message-body in a request is
751
    # signaled by the inclusion of a Content-Length or Transfer-Encoding header
752
    # field in the request's message-headers."
753
    if self._msg.body:
754
      self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
755

    
756
  def _FormatMessage(self):
757
    """Serializes the HTTP message into a string.
758

759
    """
760
    buf = StringIO()
761

    
762
    # Add start line
763
    buf.write(str(self._msg.start_line))
764
    buf.write("\r\n")
765

    
766
    # Add headers
767
    if self._msg.start_line.version != HTTP_0_9:
768
      for name, value in self._msg.headers.iteritems():
769
        buf.write("%s: %s\r\n" % (name, value))
770

    
771
    buf.write("\r\n")
772

    
773
    # Add message body if needed
774
    if self.HasMessageBody():
775
      buf.write(self._msg.body)
776

    
777
    elif self._msg.body:
778
      logging.warning("Ignoring message body")
779

    
780
    return buf.getvalue()
781

    
782
  def HasMessageBody(self):
783
    """Checks whether the HTTP message contains a body.
784

785
    Can be overridden by subclasses.
786

787
    """
788
    return bool(self._msg.body)
789

    
790

    
791
class HttpMessageReader(object):
792
  """Reads HTTP message from socket.
793

794
  """
795
  # Length limits
796
  START_LINE_LENGTH_MAX = None
797
  HEADER_LENGTH_MAX = None
798

    
799
  # Parser state machine
800
  PS_START_LINE = "start-line"
801
  PS_HEADERS = "headers"
802
  PS_BODY = "entity-body"
803
  PS_COMPLETE = "complete"
804

    
805
  def __init__(self, sock, msg, read_timeout):
806
    """Reads an HTTP message from a socket.
807

808
    @type sock: socket
809
    @param sock: Socket to be read from
810
    @type msg: http.HttpMessage
811
    @param msg: Object for the read message
812
    @type read_timeout: float
813
    @param read_timeout: Read timeout for socket
814

815
    """
816
    self.sock = sock
817
    self.msg = msg
818

    
819
    self.start_line_buffer = None
820
    self.header_buffer = StringIO()
821
    self.body_buffer = StringIO()
822
    self.parser_status = self.PS_START_LINE
823
    self.content_length = None
824
    self.peer_will_close = None
825

    
826
    buf = ""
827
    eof = False
828
    while self.parser_status != self.PS_COMPLETE:
829
      # TODO: Don't read more than necessary (Content-Length), otherwise
830
      # data might be lost and/or an error could occur
831
      data = SocketOperation(sock, SOCKOP_RECV, SOCK_BUF_SIZE, read_timeout)
832

    
833
      if data:
834
        buf += data
835
      else:
836
        eof = True
837

    
838
      # Do some parsing and error checking while more data arrives
839
      buf = self._ContinueParsing(buf, eof)
840

    
841
      # Must be done only after the buffer has been evaluated
842
      # TODO: Content-Length < len(data read) and connection closed
843
      if (eof and
844
          self.parser_status in (self.PS_START_LINE,
845
                                 self.PS_HEADERS)):
846
        raise HttpError("Connection closed prematurely")
847

    
848
    # Parse rest
849
    buf = self._ContinueParsing(buf, True)
850

    
851
    assert self.parser_status == self.PS_COMPLETE
852
    assert not buf, "Parser didn't read full response"
853

    
854
    msg.body = self.body_buffer.getvalue()
855

    
856
    # TODO: Content-type, error handling
857
    if msg.body:
858
      msg.decoded_body = HttpJsonConverter().Decode(msg.body)
859
    else:
860
      msg.decoded_body = None
861

    
862
    if msg.decoded_body:
863
      logging.debug("Message body: %s", msg.decoded_body)
864

    
865
  def _ContinueParsing(self, buf, eof):
866
    """Main function for HTTP message state machine.
867

868
    @type buf: string
869
    @param buf: Receive buffer
870
    @type eof: bool
871
    @param eof: Whether we've reached EOF on the socket
872
    @rtype: string
873
    @return: Updated receive buffer
874

875
    """
876
    # TODO: Use offset instead of slicing when possible
877
    if self.parser_status == self.PS_START_LINE:
878
      # Expect start line
879
      while True:
880
        idx = buf.find("\r\n")
881

    
882
        # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
883
        # ignore any empty line(s) received where a Request-Line is expected.
884
        # In other words, if the server is reading the protocol stream at the
885
        # beginning of a message and receives a CRLF first, it should ignore
886
        # the CRLF."
887
        if idx == 0:
888
          # TODO: Limit number of CRLFs/empty lines for safety?
889
          buf = buf[:2]
890
          continue
891

    
892
        if idx > 0:
893
          self.start_line_buffer = buf[:idx]
894

    
895
          self._CheckStartLineLength(len(self.start_line_buffer))
896

    
897
          # Remove status line, including CRLF
898
          buf = buf[idx + 2:]
899

    
900
          self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
901

    
902
          self.parser_status = self.PS_HEADERS
903
        else:
904
          # Check whether incoming data is getting too large, otherwise we just
905
          # fill our read buffer.
906
          self._CheckStartLineLength(len(buf))
907

    
908
        break
909

    
910
    # TODO: Handle messages without headers
911
    if self.parser_status == self.PS_HEADERS:
912
      # Wait for header end
913
      idx = buf.find("\r\n\r\n")
914
      if idx >= 0:
915
        self.header_buffer.write(buf[:idx + 2])
916

    
917
        self._CheckHeaderLength(self.header_buffer.tell())
918

    
919
        # Remove headers, including CRLF
920
        buf = buf[idx + 4:]
921

    
922
        self._ParseHeaders()
923

    
924
        self.parser_status = self.PS_BODY
925
      else:
926
        # Check whether incoming data is getting too large, otherwise we just
927
        # fill our read buffer.
928
        self._CheckHeaderLength(len(buf))
929

    
930
    if self.parser_status == self.PS_BODY:
931
      # TODO: Implement max size for body_buffer
932
      self.body_buffer.write(buf)
933
      buf = ""
934

    
935
      # Check whether we've read everything
936
      #
937
      # RFC2616, section 4.4: "When a message-body is included with a message,
938
      # the transfer-length of that body is determined by one of the following
939
      # [...] 5. By the server closing the connection. (Closing the connection
940
      # cannot be used to indicate the end of a request body, since that would
941
      # leave no possibility for the server to send back a response.)"
942
      #
943
      # TODO: Error when buffer length > Content-Length header
944
      if (eof or
945
          self.content_length is None or
946
          (self.content_length is not None and
947
           self.body_buffer.tell() >= self.content_length)):
948
        self.parser_status = self.PS_COMPLETE
949

    
950
    return buf
951

    
952
  def _CheckStartLineLength(self, length):
953
    """Limits the start line buffer size.
954

955
    @type length: int
956
    @param length: Buffer size
957

958
    """
959
    if (self.START_LINE_LENGTH_MAX is not None and
960
        length > self.START_LINE_LENGTH_MAX):
961
      raise HttpError("Start line longer than %d chars" %
962
                       self.START_LINE_LENGTH_MAX)
963

    
964
  def _CheckHeaderLength(self, length):
965
    """Limits the header buffer size.
966

967
    @type length: int
968
    @param length: Buffer size
969

970
    """
971
    if (self.HEADER_LENGTH_MAX is not None and
972
        length > self.HEADER_LENGTH_MAX):
973
      raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
974

    
975
  def ParseStartLine(self, start_line):
976
    """Parses the start line of a message.
977

978
    Must be overridden by subclass.
979

980
    @type start_line: string
981
    @param start_line: Start line string
982

983
    """
984
    raise NotImplementedError()
985

    
986
  def _WillPeerCloseConnection(self):
987
    """Evaluate whether peer will close the connection.
988

989
    @rtype: bool
990
    @return: Whether peer will close the connection
991

992
    """
993
    # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
994
    # for the sender to signal that the connection will be closed after
995
    # completion of the response. For example,
996
    #
997
    #        Connection: close
998
    #
999
    # in either the request or the response header fields indicates that the
1000
    # connection SHOULD NOT be considered `persistent' (section 8.1) after the
1001
    # current request/response is complete."
1002

    
1003
    hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
1004
    if hdr_connection:
1005
      hdr_connection = hdr_connection.lower()
1006

    
1007
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1008
    if self.msg.start_line.version == HTTP_1_1:
1009
      return (hdr_connection and "close" in hdr_connection)
1010

    
1011
    # Some HTTP/1.0 implementations have support for persistent connections,
1012
    # using rules different than HTTP/1.1.
1013

    
1014
    # For older HTTP, Keep-Alive indicates persistent connection.
1015
    if self.msg.headers.get(HTTP_KEEP_ALIVE):
1016
      return False
1017

    
1018
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1019
    # supposed to be sent by the client.
1020
    if hdr_connection and "keep-alive" in hdr_connection:
1021
      return False
1022

    
1023
    return True
1024

    
1025
  def _ParseHeaders(self):
1026
    """Parses the headers.
1027

1028
    This function also adjusts internal variables based on header values.
1029

1030
    RFC2616, section 4.3: The presence of a message-body in a request is
1031
    signaled by the inclusion of a Content-Length or Transfer-Encoding header
1032
    field in the request's message-headers.
1033

1034
    """
1035
    # Parse headers
1036
    self.header_buffer.seek(0, 0)
1037
    self.msg.headers = mimetools.Message(self.header_buffer, 0)
1038

    
1039
    self.peer_will_close = self._WillPeerCloseConnection()
1040

    
1041
    # Do we have a Content-Length header?
1042
    hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
1043
    if hdr_content_length:
1044
      try:
1045
        self.content_length = int(hdr_content_length)
1046
      except ValueError:
1047
        self.content_length = None
1048
      if self.content_length is not None and self.content_length < 0:
1049
        self.content_length = None
1050

    
1051
    # if the connection remains open and a content-length was not provided,
1052
    # then assume that the connection WILL close.
1053
    if self.content_length is None:
1054
      self.peer_will_close = True