Revision 9501323b

b/lib/http/__init__.py
22 22

  
23 23
"""
24 24

  
25
import BaseHTTPServer
26
import cgi
27 25
import logging
28 26
import mimetools
29 27
import OpenSSL
30
import os
31 28
import select
32 29
import socket
33
import time
34
import signal
35 30
import errno
36
import threading
37 31

  
38 32
from cStringIO import StringIO
39 33

  
40 34
from ganeti import constants
41 35
from ganeti import serializer
42
from ganeti import workerpool
43 36
from ganeti import utils
44 37

  
45 38

  
......
47 40

  
48 41
HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
49 42

  
50
WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
51
MONTHNAME = [None,
52
             'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
53
             'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
54

  
55
# Default error message
56
DEFAULT_ERROR_CONTENT_TYPE = "text/html"
57
DEFAULT_ERROR_MESSAGE = """\
58
<head>
59
<title>Error response</title>
60
</head>
61
<body>
62
<h1>Error response</h1>
63
<p>Error code %(code)d.
64
<p>Message: %(message)s.
65
<p>Error code explanation: %(code)s = %(explain)s.
66
</body>
67
"""
68

  
69 43
HTTP_OK = 200
70 44
HTTP_NO_CONTENT = 204
71 45
HTTP_NOT_MODIFIED = 304
......
97 71
 SOCKOP_SHUTDOWN) = range(3)
98 72

  
99 73

  
100
class SocketClosed(socket.error):
101
  pass
102

  
103

  
104 74
class HttpError(Exception):
105 75
  """Internal exception for HTTP errors.
106 76

  
......
449 419
            self._ssl_cert.digest("md5") == cert.digest("md5"))
450 420

  
451 421

  
452
class HttpServerRequestExecutor(object):
453
  """Implements server side of HTTP
454

  
455
  This class implements the server side of HTTP. It's based on code of Python's
456
  BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
457
  character encodings. Keep-alive connections are not supported.
458

  
459
  """
460
  # The default request version.  This only affects responses up until
461
  # the point where the request line is parsed, so it mainly decides what
462
  # the client gets back when sending a malformed request line.
463
  # Most web servers default to HTTP 0.9, i.e. don't send a status line.
464
  default_request_version = HTTP_0_9
465

  
466
  # Error message settings
467
  error_message_format = DEFAULT_ERROR_MESSAGE
468
  error_content_type = DEFAULT_ERROR_CONTENT_TYPE
469

  
470
  responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
471

  
472
  def __init__(self, server, conn, client_addr, fileio_class):
473
    """Initializes this class.
474

  
475
    Part of the initialization is reading the request and eventual POST/PUT
476
    data sent by the client.
477

  
478
    """
479
    self._server = server
480

  
481
    # We default rfile to buffered because otherwise it could be
482
    # really slow for large data (a getc() call per byte); we make
483
    # wfile unbuffered because (a) often after a write() we want to
484
    # read and we need to flush the line; (b) big writes to unbuffered
485
    # files are typically optimized by stdio even when big reads
486
    # aren't.
487
    self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
488
    self.wfile = fileio_class(conn, mode="wb", bufsize=0)
489

  
490
    self.client_addr = client_addr
491

  
492
    self.request_headers = None
493
    self.request_method = None
494
    self.request_path = None
495
    self.request_requestline = None
496
    self.request_version = self.default_request_version
497

  
498
    self.response_body = None
499
    self.response_code = HTTP_OK
500
    self.response_content_type = None
501
    self.response_headers = {}
502

  
503
    logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
504
    try:
505
      try:
506
        try:
507
          try:
508
            # Read, parse and handle request
509
            self._ReadRequest()
510
            self._ReadPostData()
511
            self._HandleRequest()
512
          except HttpException, err:
513
            self._SetErrorStatus(err)
514
        finally:
515
          # Try to send a response
516
          self._SendResponse()
517
          self._Close()
518
      except SocketClosed:
519
        pass
520
    finally:
521
      logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
522

  
523
  def _Close(self):
524
    if not self.wfile.closed:
525
      self.wfile.flush()
526
    self.wfile.close()
527
    self.rfile.close()
528

  
529
  def _DateTimeHeader(self):
530
    """Return the current date and time formatted for a message header.
531

  
532
    """
533
    (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
534
    return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
535
            (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
536

  
537
  def _SetErrorStatus(self, err):
538
    """Sets the response code and body from a HttpException.
539

  
540
    @type err: HttpException
541
    @param err: Exception instance
542

  
543
    """
544
    try:
545
      (shortmsg, longmsg) = self.responses[err.code]
546
    except KeyError:
547
      shortmsg = longmsg = "Unknown"
548

  
549
    if err.message:
550
      message = err.message
551
    else:
552
      message = shortmsg
553

  
554
    values = {
555
      "code": err.code,
556
      "message": cgi.escape(message),
557
      "explain": longmsg,
558
      }
559

  
560
    self.response_code = err.code
561
    self.response_content_type = self.error_content_type
562
    self.response_body = self.error_message_format % values
563

  
564
  def _HandleRequest(self):
565
    """Handle the actual request.
566

  
567
    Calls the actual handler function and converts exceptions into HTTP errors.
568

  
569
    """
570
    # Don't do anything if there's already been a problem
571
    if self.response_code != HTTP_OK:
572
      return
573

  
574
    assert self.request_method, "Status code %s requires a method" % HTTP_OK
575

  
576
    # Check whether client is still there
577
    self.rfile.read(0)
578

  
579
    try:
580
      try:
581
        result = self._server.HandleRequest(self)
582

  
583
        # TODO: Content-type
584
        encoder = HttpJsonConverter()
585
        body = encoder.Encode(result)
586

  
587
        self.response_content_type = encoder.CONTENT_TYPE
588
        self.response_body = body
589
      except (HttpException, KeyboardInterrupt, SystemExit):
590
        raise
591
      except Exception, err:
592
        logging.exception("Caught exception")
593
        raise HttpInternalError(message=str(err))
594
      except:
595
        logging.exception("Unknown exception")
596
        raise HttpInternalError(message="Unknown error")
597

  
598
    except HttpException, err:
599
      self._SetErrorStatus(err)
600

  
601
  def _SendResponse(self):
602
    """Sends response to the client.
603

  
604
    """
605
    # Check whether client is still there
606
    self.rfile.read(0)
607

  
608
    logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
609
                 self.request_requestline, self.response_code)
610

  
611
    if self.response_code in self.responses:
612
      response_message = self.responses[self.response_code][0]
613
    else:
614
      response_message = ""
615

  
616
    if self.request_version != HTTP_0_9:
617
      self.wfile.write("%s %d %s\r\n" %
618
                       (self.request_version, self.response_code,
619
                        response_message))
620
      self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
621
      self._SendHeader(HTTP_DATE, self._DateTimeHeader())
622
      self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
623
      self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
624
      for key, val in self.response_headers.iteritems():
625
        self._SendHeader(key, val)
626

  
627
      # We don't support keep-alive at this time
628
      self._SendHeader(HTTP_CONNECTION, "close")
629
      self.wfile.write("\r\n")
630

  
631
    if (self.request_method != HTTP_HEAD and
632
        self.response_code >= HTTP_OK and
633
        self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
634
      self.wfile.write(self.response_body)
635

  
636
  def _SendHeader(self, name, value):
637
    if self.request_version != HTTP_0_9:
638
      self.wfile.write("%s: %s\r\n" % (name, value))
639

  
640
  def _ReadRequest(self):
641
    """Reads and parses request line
642

  
643
    """
644
    raw_requestline = self.rfile.readline()
645

  
646
    requestline = raw_requestline
647
    if requestline[-2:] == '\r\n':
648
      requestline = requestline[:-2]
649
    elif requestline[-1:] == '\n':
650
      requestline = requestline[:-1]
651

  
652
    if not requestline:
653
      raise HttpBadRequest("Empty request line")
654

  
655
    self.request_requestline = requestline
656

  
657
    logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
658

  
659
    words = requestline.split()
660

  
661
    if len(words) == 3:
662
      [method, path, version] = words
663
      if version[:5] != 'HTTP/':
664
        raise HttpBadRequest("Bad request version (%r)" % version)
665

  
666
      try:
667
        base_version_number = version.split('/', 1)[1]
668
        version_number = base_version_number.split(".")
669

  
670
        # RFC 2145 section 3.1 says there can be only one "." and
671
        #   - major and minor numbers MUST be treated as
672
        #      separate integers;
673
        #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
674
        #      turn is lower than HTTP/12.3;
675
        #   - Leading zeros MUST be ignored by recipients.
676
        if len(version_number) != 2:
677
          raise HttpBadRequest("Bad request version (%r)" % version)
678

  
679
        version_number = int(version_number[0]), int(version_number[1])
680
      except (ValueError, IndexError):
681
        raise HttpBadRequest("Bad request version (%r)" % version)
682

  
683
      if version_number >= (2, 0):
684
        raise HttpVersionNotSupported("Invalid HTTP Version (%s)" %
685
                                      base_version_number)
686

  
687
    elif len(words) == 2:
688
      version = HTTP_0_9
689
      [method, path] = words
690
      if method != HTTP_GET:
691
        raise HttpBadRequest("Bad HTTP/0.9 request type (%r)" % method)
692

  
693
    else:
694
      raise HttpBadRequest("Bad request syntax (%r)" % requestline)
695

  
696
    # Examine the headers and look for a Connection directive
697
    headers = mimetools.Message(self.rfile, 0)
698

  
699
    self.request_method = method
700
    self.request_path = path
701
    self.request_version = version
702
    self.request_headers = headers
703

  
704
  def _ReadPostData(self):
705
    """Reads POST/PUT data
706

  
707
    Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
708
    a request is signaled by the inclusion of a Content-Length header field in
709
    the request message headers. HTTP/1.0 requests containing an entity body
710
    must include a valid Content-Length header field."
711

  
712
    """
713
    # While not according to specification, we only support an entity body for
714
    # POST and PUT.
715
    if (not self.request_method or
716
        self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
717
      self.request_post_data = None
718
      return
719

  
720
    content_length = None
721
    try:
722
      if HTTP_CONTENT_LENGTH in self.request_headers:
723
        content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
724
    except TypeError:
725
      pass
726
    except ValueError:
727
      pass
728

  
729
    # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
730
    if content_length is None:
731
      raise HttpLengthRequired("Missing Content-Length header or"
732
                               " invalid format")
733

  
734
    data = self.rfile.read(content_length)
735

  
736
    # TODO: Content-type, error handling
737
    if data:
738
      self.request_post_data = HttpJsonConverter().Decode(data)
739
    else:
740
      self.request_post_data = None
741

  
742
    logging.debug("HTTP POST data: %s", self.request_post_data)
743

  
744

  
745
class HttpServer(HttpSocketBase):
746
  """Generic HTTP server class
747

  
748
  Users of this class must subclass it and override the HandleRequest function.
749

  
750
  """
751
  MAX_CHILDREN = 20
752

  
753
  def __init__(self, mainloop, local_address, port,
754
               ssl_params=None, ssl_verify_peer=False):
755
    """Initializes the HTTP server
756

  
757
    @type mainloop: ganeti.daemon.Mainloop
758
    @param mainloop: Mainloop used to poll for I/O events
759
    @type local_addess: string
760
    @param local_address: Local IP address to bind to
761
    @type port: int
762
    @param port: TCP port to listen on
763
    @type ssl_params: HttpSslParams
764
    @param ssl_params: SSL key and certificate
765
    @type ssl_verify_peer: bool
766
    @param ssl_verify_peer: Whether to require client certificate and compare
767
                            it with our certificate
768

  
769
    """
770
    HttpSocketBase.__init__(self)
771

  
772
    self.mainloop = mainloop
773
    self.local_address = local_address
774
    self.port = port
775

  
776
    self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
777

  
778
    # Allow port to be reused
779
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
780

  
781
    if self._using_ssl:
782
      self._fileio_class = _SSLFileObject
783
    else:
784
      self._fileio_class = socket._fileobject
785

  
786
    self._children = []
787

  
788
    mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
789
    mainloop.RegisterSignal(self)
790

  
791
  def Start(self):
792
    self.socket.bind((self.local_address, self.port))
793
    self.socket.listen(5)
794

  
795
  def Stop(self):
796
    self.socket.close()
797

  
798
  def OnIO(self, fd, condition):
799
    if condition & select.POLLIN:
800
      self._IncomingConnection()
801

  
802
  def OnSignal(self, signum):
803
    if signum == signal.SIGCHLD:
804
      self._CollectChildren(True)
805

  
806
  def _CollectChildren(self, quick):
807
    """Checks whether any child processes are done
808

  
809
    @type quick: bool
810
    @param quick: Whether to only use non-blocking functions
811

  
812
    """
813
    if not quick:
814
      # Don't wait for other processes if it should be a quick check
815
      while len(self._children) > self.MAX_CHILDREN:
816
        try:
817
          # Waiting without a timeout brings us into a potential DoS situation.
818
          # As soon as too many children run, we'll not respond to new
819
          # requests. The real solution would be to add a timeout for children
820
          # and killing them after some time.
821
          pid, status = os.waitpid(0, 0)
822
        except os.error:
823
          pid = None
824
        if pid and pid in self._children:
825
          self._children.remove(pid)
826

  
827
    for child in self._children:
828
      try:
829
        pid, status = os.waitpid(child, os.WNOHANG)
830
      except os.error:
831
        pid = None
832
      if pid and pid in self._children:
833
        self._children.remove(pid)
834

  
835
  def _IncomingConnection(self):
836
    """Called for each incoming connection
837

  
838
    """
839
    (connection, client_addr) = self.socket.accept()
840

  
841
    self._CollectChildren(False)
842

  
843
    pid = os.fork()
844
    if pid == 0:
845
      # Child process
846
      try:
847
        HttpServerRequestExecutor(self, connection, client_addr,
848
                                  self._fileio_class)
849
      except:
850
        logging.exception("Error while handling request from %s:%s",
851
                          client_addr[0], client_addr[1])
852
        os._exit(1)
853
      os._exit(0)
854
    else:
855
      self._children.append(pid)
856

  
857
  def HandleRequest(self, req):
858
    raise NotImplementedError()
859

  
860

  
861
class HttpClientRequest(object):
862
  def __init__(self, host, port, method, path, headers=None, post_data=None,
863
               ssl_params=None, ssl_verify_peer=False):
864
    """Describes an HTTP request.
865

  
866
    @type host: string
867
    @param host: Hostname
868
    @type port: int
869
    @param port: Port
870
    @type method: string
871
    @param method: Method name
872
    @type path: string
873
    @param path: Request path
874
    @type headers: dict or None
875
    @param headers: Additional headers to send
876
    @type post_data: string or None
877
    @param post_data: Additional data to send
878
    @type ssl_params: HttpSslParams
879
    @param ssl_params: SSL key and certificate
880
    @type ssl_verify_peer: bool
881
    @param ssl_verify_peer: Whether to compare our certificate with server's
882
                            certificate
883

  
884
    """
885
    if post_data is not None:
886
      assert method.upper() in (HTTP_POST, HTTP_PUT), \
887
        "Only POST and GET requests support sending data"
888

  
889
    assert path.startswith("/"), "Path must start with slash (/)"
890

  
891
    self.host = host
892
    self.port = port
893
    self.ssl_params = ssl_params
894
    self.ssl_verify_peer = ssl_verify_peer
895
    self.method = method
896
    self.path = path
897
    self.headers = headers
898
    self.post_data = post_data
899

  
900
    self.success = None
901
    self.error = None
902

  
903
    self.resp_status_line = None
904
    self.resp_version = None
905
    self.resp_status = None
906
    self.resp_reason = None
907
    self.resp_headers = None
908
    self.resp_body = None
909

  
910

  
911
class HttpClientRequestExecutor(HttpSocketBase):
912
  # Default headers
913
  DEFAULT_HEADERS = {
914
    HTTP_USER_AGENT: HTTP_GANETI_VERSION,
915
    # TODO: For keep-alive, don't send "Connection: close"
916
    HTTP_CONNECTION: "close",
917
    }
918

  
919
  # Length limits
920
  STATUS_LINE_LENGTH_MAX = 512
921
  HEADER_LENGTH_MAX = 4 * 1024
922

  
923
  # Timeouts in seconds for socket layer
924
  # TODO: Make read timeout configurable per OpCode
925
  CONNECT_TIMEOUT = 5.0
926
  WRITE_TIMEOUT = 10
927
  READ_TIMEOUT = None
928
  CLOSE_TIMEOUT = 1
929

  
930
  # Parser state machine
931
  PS_STATUS_LINE = "status-line"
932
  PS_HEADERS = "headers"
933
  PS_BODY = "body"
934
  PS_COMPLETE = "complete"
935

  
936
  def __init__(self, req):
937
    """Initializes the HttpClientRequestExecutor class.
938

  
939
    @type req: HttpClientRequest
940
    @param req: Request object
941

  
942
    """
943
    HttpSocketBase.__init__(self)
944

  
945
    self.request = req
946

  
947
    self.parser_status = self.PS_STATUS_LINE
948
    self.header_buffer = StringIO()
949
    self.body_buffer = StringIO()
950
    self.content_length = None
951
    self.server_will_close = None
952

  
953
    self.poller = select.poll()
954

  
955
    try:
956
      # TODO: Implement connection caching/keep-alive
957
      self.sock = self._CreateSocket(req.ssl_params,
958
                                     req.ssl_verify_peer)
959

  
960
      # Disable Python's timeout
961
      self.sock.settimeout(None)
962

  
963
      # Operate in non-blocking mode
964
      self.sock.setblocking(0)
965

  
966
      force_close = True
967
      self._Connect()
968
      try:
969
        self._SendRequest()
970
        self._ReadResponse()
971

  
972
        # Only wait for server to close if we didn't have any exception.
973
        force_close = False
974
      finally:
975
        self._CloseConnection(force_close)
976

  
977
      self.sock.close()
978
      self.sock = None
979

  
980
      req.resp_body = self.body_buffer.getvalue()
981

  
982
      req.success = True
983
      req.error = None
984

  
985
    except _HttpClientError, err:
986
      req.success = False
987
      req.error = str(err)
988

  
989
  def _BuildRequest(self):
990
    """Build HTTP request.
991

  
992
    @rtype: string
993
    @return: Complete request
994

  
995
    """
996
    # Headers
997
    send_headers = self.DEFAULT_HEADERS.copy()
998

  
999
    if self.request.headers:
1000
      send_headers.update(self.request.headers)
1001

  
1002
    send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
1003

  
1004
    if self.request.post_data:
1005
      send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
1006

  
1007
    buf = StringIO()
1008

  
1009
    # Add request line. We only support HTTP/1.0 (no chunked transfers and no
1010
    # keep-alive).
1011
    # TODO: For keep-alive, change to HTTP/1.1
1012
    buf.write("%s %s %s\r\n" % (self.request.method.upper(),
1013
                                self.request.path, HTTP_1_0))
1014

  
1015
    # Add headers
1016
    for name, value in send_headers.iteritems():
1017
      buf.write("%s: %s\r\n" % (name, value))
1018

  
1019
    buf.write("\r\n")
1020

  
1021
    if self.request.post_data:
1022
      buf.write(self.request.post_data)
1023

  
1024
    return buf.getvalue()
1025

  
1026
  def _ParseStatusLine(self):
1027
    """Parses the status line sent by the server.
1028

  
1029
    """
1030
    line = self.request.resp_status_line
1031

  
1032
    if not line:
1033
      raise _HttpClientError("Empty status line")
1034

  
1035
    try:
1036
      [version, status, reason] = line.split(None, 2)
1037
    except ValueError:
1038
      try:
1039
        [version, status] = line.split(None, 1)
1040
        reason = ""
1041
      except ValueError:
1042
        version = HTTP_9_0
1043

  
1044
    if version:
1045
      version = version.upper()
1046

  
1047
    if version not in (HTTP_1_0, HTTP_1_1):
1048
      # We do not support HTTP/0.9, despite the specification requiring it
1049
      # (RFC2616, section 19.6)
1050
      raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1051
                             line)
1052

  
1053
    # The status code is a three-digit number
1054
    try:
1055
      status = int(status)
1056
      if status < 100 or status > 999:
1057
        status = -1
1058
    except ValueError:
1059
      status = -1
1060

  
1061
    if status == -1:
1062
      raise _HttpClientError("Invalid status code (%r)" % line)
1063

  
1064
    self.request.resp_version = version
1065
    self.request.resp_status = status
1066
    self.request.resp_reason = reason
1067

  
1068
  def _WillServerCloseConnection(self):
1069
    """Evaluate whether server will close the connection.
1070

  
1071
    @rtype: bool
1072
    @return: Whether server will close the connection
1073

  
1074
    """
1075
    hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1076
    if hdr_connection:
1077
      hdr_connection = hdr_connection.lower()
1078

  
1079
    # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1080
    if self.request.resp_version == HTTP_1_1:
1081
      return (hdr_connection and "close" in hdr_connection)
1082

  
1083
    # Some HTTP/1.0 implementations have support for persistent connections,
1084
    # using rules different than HTTP/1.1.
1085

  
1086
    # For older HTTP, Keep-Alive indicates persistent connection.
1087
    if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1088
      return False
1089

  
1090
    # At least Akamai returns a "Connection: Keep-Alive" header, which was
1091
    # supposed to be sent by the client.
1092
    if hdr_connection and "keep-alive" in hdr_connection:
1093
      return False
1094

  
1095
    return True
1096

  
1097
  def _ParseHeaders(self):
1098
    """Parses the headers sent by the server.
1099

  
1100
    This function also adjusts internal variables based on the header values.
1101

  
1102
    """
1103
    req = self.request
1104

  
1105
    # Parse headers
1106
    self.header_buffer.seek(0, 0)
1107
    req.resp_headers = mimetools.Message(self.header_buffer, 0)
1108

  
1109
    self.server_will_close = self._WillServerCloseConnection()
1110

  
1111
    # Do we have a Content-Length header?
1112
    hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1113
    if hdr_content_length:
1114
      try:
1115
        self.content_length = int(hdr_content_length)
1116
      except ValueError:
1117
        pass
1118
      if self.content_length is not None and self.content_length < 0:
1119
        self.content_length = None
1120

  
1121
    # does the body have a fixed length? (of zero)
1122
    if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1123
        100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1124
      self.content_length = 0
1125

  
1126
    # if the connection remains open and a content-length was not provided,
1127
    # then assume that the connection WILL close.
1128
    if self.content_length is None:
1129
      self.server_will_close = True
1130

  
1131
  def _CheckStatusLineLength(self, length):
1132
    if length > self.STATUS_LINE_LENGTH_MAX:
1133
      raise _HttpClientError("Status line longer than %d chars" %
1134
                             self.STATUS_LINE_LENGTH_MAX)
1135

  
1136
  def _CheckHeaderLength(self, length):
1137
    if length > self.HEADER_LENGTH_MAX:
1138
      raise _HttpClientError("Headers longer than %d chars" %
1139
                             self.HEADER_LENGTH_MAX)
1140

  
1141
  def _ParseBuffer(self, buf, eof):
1142
    """Main function for HTTP response state machine.
1143

  
1144
    @type buf: string
1145
    @param buf: Receive buffer
1146
    @type eof: bool
1147
    @param eof: Whether we've reached EOF on the socket
1148
    @rtype: string
1149
    @return: Updated receive buffer
1150

  
1151
    """
1152
    if self.parser_status == self.PS_STATUS_LINE:
1153
      # Expect status line
1154
      idx = buf.find("\r\n")
1155
      if idx >= 0:
1156
        self.request.resp_status_line = buf[:idx]
1157

  
1158
        self._CheckStatusLineLength(len(self.request.resp_status_line))
1159

  
1160
        # Remove status line, including CRLF
1161
        buf = buf[idx + 2:]
1162

  
1163
        self._ParseStatusLine()
1164

  
1165
        self.parser_status = self.PS_HEADERS
1166
      else:
1167
        # Check whether incoming data is getting too large, otherwise we just
1168
        # fill our read buffer.
1169
        self._CheckStatusLineLength(len(buf))
1170

  
1171
    if self.parser_status == self.PS_HEADERS:
1172
      # Wait for header end
1173
      idx = buf.find("\r\n\r\n")
1174
      if idx >= 0:
1175
        self.header_buffer.write(buf[:idx + 2])
1176

  
1177
        self._CheckHeaderLength(self.header_buffer.tell())
1178

  
1179
        # Remove headers, including CRLF
1180
        buf = buf[idx + 4:]
1181

  
1182
        self._ParseHeaders()
1183

  
1184
        self.parser_status = self.PS_BODY
1185
      else:
1186
        # Check whether incoming data is getting too large, otherwise we just
1187
        # fill our read buffer.
1188
        self._CheckHeaderLength(len(buf))
1189

  
1190
    if self.parser_status == self.PS_BODY:
1191
      self.body_buffer.write(buf)
1192
      buf = ""
1193

  
1194
      # Check whether we've read everything
1195
      if (eof or
1196
          (self.content_length is not None and
1197
           self.body_buffer.tell() >= self.content_length)):
1198
        self.parser_status = self.PS_COMPLETE
1199

  
1200
    return buf
1201

  
1202
  def _Connect(self):
1203
    """Non-blocking connect to host with timeout.
1204

  
1205
    """
1206
    connected = False
1207
    while True:
1208
      try:
1209
        connect_error = self.sock.connect_ex((self.request.host,
1210
                                              self.request.port))
1211
      except socket.gaierror, err:
1212
        raise _HttpClientError("Connection failed: %s" % str(err))
1213

  
1214
      if connect_error == errno.EINTR:
1215
        # Mask signals
1216
        pass
1217

  
1218
      elif connect_error == 0:
1219
        # Connection established
1220
        connected = True
1221
        break
1222

  
1223
      elif connect_error == errno.EINPROGRESS:
1224
        # Connection started
1225
        break
1226

  
1227
      raise _HttpClientError("Connection failed (%s: %s)" %
1228
                             (connect_error, os.strerror(connect_error)))
1229

  
1230
    if not connected:
1231
      # Wait for connection
1232
      event = WaitForSocketCondition(self.poller, self.sock,
1233
                                     select.POLLOUT, self.CONNECT_TIMEOUT)
1234
      if event is None:
1235
        raise _HttpClientError("Timeout while connecting to server")
1236

  
1237
      # Get error code
1238
      connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1239
      if connect_error != 0:
1240
        raise _HttpClientError("Connection failed (%s: %s)" %
1241
                               (connect_error, os.strerror(connect_error)))
1242

  
1243
    # Enable TCP keep-alive
1244
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1245

  
1246
    # If needed, Linux specific options are available to change the TCP
1247
    # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1248
    # TCP_KEEPINTVL.
1249

  
1250
  def _SendRequest(self):
1251
    """Sends request to server.
1252

  
1253
    """
1254
    buf = self._BuildRequest()
1255

  
1256
    while buf:
1257
      # Send only 4 KB at a time
1258
      data = buf[:4096]
1259

  
1260
      try:
1261
        sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1262
                               self.WRITE_TIMEOUT)
1263
      except HttpSocketTimeout:
1264
        raise _HttpClientError("Timeout while sending request")
1265
      except socket.error, err:
1266
        raise _HttpClientError("Error sending request: %s" % err)
1267

  
1268
      # Remove sent bytes
1269
      buf = buf[sent:]
1270

  
1271
    assert not buf, "Request wasn't sent completely"
1272

  
1273
  def _ReadResponse(self):
1274
    """Read response from server.
1275

  
1276
    Calls the parser function after reading a chunk of data.
1277

  
1278
    """
1279
    buf = ""
1280
    eof = False
1281
    while self.parser_status != self.PS_COMPLETE:
1282
      try:
1283
        data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1284
                               self.READ_TIMEOUT)
1285
      except HttpSocketTimeout:
1286
        raise _HttpClientError("Timeout while reading response")
1287
      except socket.error, err:
1288
        raise _HttpClientError("Error while reading response: %s" % err)
1289

  
1290
      if data:
1291
        buf += data
1292
      else:
1293
        eof = True
1294

  
1295
      # Do some parsing and error checking while more data arrives
1296
      buf = self._ParseBuffer(buf, eof)
1297

  
1298
      # Must be done only after the buffer has been evaluated
1299
      if (eof and
1300
          self.parser_status in (self.PS_STATUS_LINE,
1301
                                 self.PS_HEADERS)):
1302
        raise _HttpClientError("Connection closed prematurely")
1303

  
1304
    # Parse rest
1305
    buf = self._ParseBuffer(buf, True)
1306

  
1307
    assert self.parser_status == self.PS_COMPLETE
1308
    assert not buf, "Parser didn't read full response"
1309

  
1310
  def _CloseConnection(self, force):
1311
    """Closes the connection.
1312

  
1313
    """
1314
    if self.server_will_close and not force:
1315
      # Wait for server to close
1316
      try:
1317
        # Check whether it's actually closed
1318
        if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1319
                               self.CLOSE_TIMEOUT):
1320
          return
1321
      except (socket.error, _HttpClientError, HttpSocketTimeout):
1322
        # Ignore errors at this stage
1323
        pass
1324

  
1325
    # Close the connection from our side
1326
    try:
1327
      SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1328
                      socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1329
    except HttpSocketTimeout:
1330
      raise _HttpClientError("Timeout while shutting down connection")
1331
    except socket.error, err:
1332
      raise _HttpClientError("Error while shutting down connection: %s" % err)
1333

  
1334

  
1335
class _HttpClientPendingRequest(object):
1336
  """Data class for pending requests.
1337

  
1338
  """
1339
  def __init__(self, request):
1340
    self.request = request
1341

  
1342
    # Thread synchronization
1343
    self.done = threading.Event()
1344

  
1345

  
1346
class HttpClientWorker(workerpool.BaseWorker):
1347
  """HTTP client worker class.
1348

  
1349
  """
1350
  def RunTask(self, pend_req):
1351
    try:
1352
      HttpClientRequestExecutor(pend_req.request)
1353
    finally:
1354
      pend_req.done.set()
1355

  
1356

  
1357
class HttpClientWorkerPool(workerpool.WorkerPool):
1358
  def __init__(self, manager):
1359
    workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1360
                                   HttpClientWorker)
1361
    self.manager = manager
1362

  
1363

  
1364
class HttpClientManager(object):
1365
  """Manages HTTP requests.
1366

  
1367
  """
1368
  def __init__(self):
1369
    self._wpool = HttpClientWorkerPool(self)
1370

  
1371
  def __del__(self):
1372
    self.Shutdown()
1373

  
1374
  def ExecRequests(self, requests):
1375
    """Execute HTTP requests.
1376

  
1377
    This function can be called from multiple threads at the same time.
1378

  
1379
    @type requests: List of HttpClientRequest instances
1380
    @param requests: The requests to execute
1381
    @rtype: List of HttpClientRequest instances
1382
    @returns: The list of requests passed in
1383

  
1384
    """
1385
    # _HttpClientPendingRequest is used for internal thread synchronization
1386
    pending = [_HttpClientPendingRequest(req) for req in requests]
1387

  
1388
    try:
1389
      # Add requests to queue
1390
      for pend_req in pending:
1391
        self._wpool.AddTask(pend_req)
1392

  
1393
    finally:
1394
      # In case of an exception we should still wait for the rest, otherwise
1395
      # another thread from the worker pool could modify the request object
1396
      # after we returned.
1397

  
1398
      # And wait for them to finish
1399
      for pend_req in pending:
1400
        pend_req.done.wait()
1401

  
1402
    # Return original list
1403
    return requests
1404

  
1405
  def Shutdown(self):
1406
    self._wpool.Quiesce()
1407
    self._wpool.TerminateWorkers()
1408

  
1409

  
1410
class _SSLFileObject(object):
1411
  """Wrapper around socket._fileobject
1412

  
1413
  This wrapper is required to handle OpenSSL exceptions.
1414

  
1415
  """
1416
  def _RequireOpenSocket(fn):
1417
    def wrapper(self, *args, **kwargs):
1418
      if self.closed:
1419
        raise SocketClosed("Socket is closed")
1420
      return fn(self, *args, **kwargs)
1421
    return wrapper
1422

  
1423
  def __init__(self, sock, mode='rb', bufsize=-1):
1424
    self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1425

  
1426
  def _ConnectionLost(self):
1427
    self._base = None
1428

  
1429
  def _getclosed(self):
1430
    return self._base is None or self._base.closed
1431
  closed = property(_getclosed, doc="True if the file is closed")
1432

  
1433
  @_RequireOpenSocket
1434
  def close(self):
1435
    return self._base.close()
1436

  
1437
  @_RequireOpenSocket
1438
  def flush(self):
1439
    return self._base.flush()
1440

  
1441
  @_RequireOpenSocket
1442
  def fileno(self):
1443
    return self._base.fileno()
1444

  
1445
  @_RequireOpenSocket
1446
  def read(self, size=-1):
1447
    return self._ReadWrapper(self._base.read, size=size)
1448

  
1449
  @_RequireOpenSocket
1450
  def readline(self, size=-1):
1451
    return self._ReadWrapper(self._base.readline, size=size)
1452

  
1453
  def _ReadWrapper(self, fn, *args, **kwargs):
1454
    while True:
1455
      try:
1456
        return fn(*args, **kwargs)
1457

  
1458
      except OpenSSL.SSL.ZeroReturnError, err:
1459
        self._ConnectionLost()
1460
        return ""
1461

  
1462
      except OpenSSL.SSL.WantReadError:
1463
        continue
1464

  
1465
      #except OpenSSL.SSL.WantWriteError:
1466
      # TODO
1467

  
1468
      except OpenSSL.SSL.SysCallError, (retval, desc):
1469
        if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1470
            or retval > 0):
1471
          self._ConnectionLost()
1472
          return ""
1473

  
1474
        logging.exception("Error in OpenSSL")
1475
        self._ConnectionLost()
1476
        raise socket.error(err.args)
1477

  
1478
      except OpenSSL.SSL.Error, err:
1479
        self._ConnectionLost()
1480
        raise socket.error(err.args)
1481

  
1482
  @_RequireOpenSocket
1483
  def write(self, data):
1484
    return self._WriteWrapper(self._base.write, data)
1485

  
1486
  def _WriteWrapper(self, fn, *args, **kwargs):
1487
    while True:
1488
      try:
1489
        return fn(*args, **kwargs)
1490
      except OpenSSL.SSL.ZeroReturnError, err:
1491
        self._ConnectionLost()
1492
        return 0
1493

  
1494
      except OpenSSL.SSL.WantWriteError:
1495
        continue
1496

  
1497
      #except OpenSSL.SSL.WantReadError:
1498
      # TODO
1499

  
1500
      except OpenSSL.SSL.SysCallError, err:
1501
        if err.args[0] == -1 and data == "":
1502
          # errors when writing empty strings are expected
1503
          # and can be ignored
1504
          return 0
1505

  
1506
        self._ConnectionLost()
1507
        raise socket.error(err.args)
1508

  
1509
      except OpenSSL.SSL.Error, err:
1510
        self._ConnectionLost()
1511
        raise socket.error(err.args)
1512

  
1513

  
1514 422
class HttpMessage(object):
1515 423
  """Data structure for HTTP message.
1516 424

  

Also available in: Unified diff