9acfa5416e51d846083fef731f6097e2d55512b1
[ganeti-local] / lib / http.py
1 #
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16 # 02110-1301, USA.
17
18 """HTTP server module.
19
20 """
21
22 import BaseHTTPServer
23 import cgi
24 import logging
25 import mimetools
26 import OpenSSL
27 import os
28 import select
29 import socket
30 import sys
31 import time
32 import signal
33 import logging
34 import errno
35 import threading
36
37 from cStringIO import StringIO
38
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import workerpool
42 from ganeti import utils
43
44
45 HTTP_CLIENT_THREADS = 10
46
47 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48
49 WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50 MONTHNAME = [None,
51              'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52              'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53
54 # Default error message
55 DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56 DEFAULT_ERROR_MESSAGE = """\
57 <head>
58 <title>Error response</title>
59 </head>
60 <body>
61 <h1>Error response</h1>
62 <p>Error code %(code)d.
63 <p>Message: %(message)s.
64 <p>Error code explanation: %(code)s = %(explain)s.
65 </body>
66 """
67
68 HTTP_OK = 200
69 HTTP_NO_CONTENT = 204
70 HTTP_NOT_MODIFIED = 304
71
72 HTTP_0_9 = "HTTP/0.9"
73 HTTP_1_0 = "HTTP/1.0"
74 HTTP_1_1 = "HTTP/1.1"
75
76 HTTP_GET = "GET"
77 HTTP_HEAD = "HEAD"
78 HTTP_POST = "POST"
79 HTTP_PUT = "PUT"
80
81 HTTP_ETAG = "ETag"
82 HTTP_HOST = "Host"
83 HTTP_SERVER = "Server"
84 HTTP_DATE = "Date"
85 HTTP_USER_AGENT = "User-Agent"
86 HTTP_CONTENT_TYPE = "Content-Type"
87 HTTP_CONTENT_LENGTH = "Content-Length"
88 HTTP_CONNECTION = "Connection"
89 HTTP_KEEP_ALIVE = "Keep-Alive"
90
91 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
92
93
94 class SocketClosed(socket.error):
95   pass
96
97
98 class _HttpClientError(Exception):
99   """Internal exception for HTTP client errors.
100
101   This should only be used for internal error reporting.
102
103   """
104   pass
105
106
107 class HTTPException(Exception):
108   code = None
109   message = None
110
111   def __init__(self, message=None):
112     Exception.__init__(self)
113     if message is not None:
114       self.message = message
115
116
117 class HTTPBadRequest(HTTPException):
118   code = 400
119
120
121 class HTTPForbidden(HTTPException):
122   code = 403
123
124
125 class HTTPNotFound(HTTPException):
126   code = 404
127
128
129 class HTTPGone(HTTPException):
130   code = 410
131
132
133 class HTTPLengthRequired(HTTPException):
134   code = 411
135
136
137 class HTTPInternalError(HTTPException):
138   code = 500
139
140
141 class HTTPNotImplemented(HTTPException):
142   code = 501
143
144
145 class HTTPServiceUnavailable(HTTPException):
146   code = 503
147
148
149 class HTTPVersionNotSupported(HTTPException):
150   code = 505
151
152
153 class ApacheLogfile:
154   """Utility class to write HTTP server log files.
155
156   The written format is the "Common Log Format" as defined by Apache:
157   http://httpd.apache.org/docs/2.2/mod/mod_log_config.html#examples
158
159   """
160   def __init__(self, fd):
161     """Constructor for ApacheLogfile class.
162
163     Args:
164     - fd: Open file object
165
166     """
167     self._fd = fd
168
169   def LogRequest(self, request, format, *args):
170     self._fd.write("%s %s %s [%s] %s\n" % (
171       # Remote host address
172       request.address_string(),
173
174       # RFC1413 identity (identd)
175       "-",
176
177       # Remote user
178       "-",
179
180       # Request time
181       self._FormatCurrentTime(),
182
183       # Message
184       format % args,
185       ))
186     self._fd.flush()
187
188   def _FormatCurrentTime(self):
189     """Formats current time in Common Log Format.
190
191     """
192     return self._FormatLogTime(time.time())
193
194   def _FormatLogTime(self, seconds):
195     """Formats time for Common Log Format.
196
197     All timestamps are logged in the UTC timezone.
198
199     Args:
200     - seconds: Time in seconds since the epoch
201
202     """
203     (_, month, _, _, _, _, _, _, _) = tm = time.gmtime(seconds)
204     format = "%d/" + MONTHNAME[month] + "/%Y:%H:%M:%S +0000"
205     return time.strftime(format, tm)
206
207
208 class HTTPJsonConverter:
209   CONTENT_TYPE = "application/json"
210
211   def Encode(self, data):
212     return serializer.DumpJson(data)
213
214   def Decode(self, data):
215     return serializer.LoadJson(data)
216
217
218 class _HttpSocketBase(object):
219   """Base class for HTTP server and client.
220
221   """
222   def __init__(self):
223     self._using_ssl = None
224     self._ssl_cert = None
225     self._ssl_key = None
226
227   def _CreateSocket(self, ssl_key_path, ssl_cert_path, ssl_verify_peer):
228     """Creates a TCP socket and initializes SSL if needed.
229
230     @type ssl_key_path: string
231     @param ssl_key_path: Path to file containing SSL key in PEM format
232     @type ssl_cert_path: string
233     @param ssl_cert_path: Path to file containing SSL certificate in PEM format
234     @type ssl_verify_peer: bool
235     @param ssl_verify_peer: Whether to require client certificate and compare
236                             it with our certificate
237
238     """
239     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
240
241     # Should we enable SSL?
242     self._using_ssl = (ssl_cert_path and ssl_key_path)
243
244     if not self._using_ssl:
245       return sock
246
247     ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
248     ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
249
250     ssl_key_pem = utils.ReadFile(ssl_key_path)
251     ssl_cert_pem = utils.ReadFile(ssl_cert_path)
252
253     cr = OpenSSL.crypto
254     self._ssl_cert = cr.load_certificate(cr.FILETYPE_PEM, ssl_cert_pem)
255     self._ssl_key = cr.load_privatekey(cr.FILETYPE_PEM, ssl_key_pem)
256     del cr
257
258     ctx.use_privatekey(self._ssl_key)
259     ctx.use_certificate(self._ssl_cert)
260     ctx.check_privatekey()
261
262     if ssl_verify_peer:
263       ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
264                      OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
265                      self._SSLVerifyCallback)
266
267     return OpenSSL.SSL.Connection(ctx, sock)
268
269   def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
270     """Verify the certificate provided by the peer
271
272     We only compare fingerprints. The client must use the same certificate as
273     we do on our side.
274
275     """
276     assert self._ssl_cert and self._ssl_key, "SSL not initialized"
277
278     return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
279             self._ssl_cert.digest("md5") == cert.digest("md5"))
280
281
282 class _HttpConnectionHandler(object):
283   """Implements server side of HTTP
284
285   This class implements the server side of HTTP. It's based on code of Python's
286   BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
287   character encodings. Keep-alive connections are not supported.
288
289   """
290   # The default request version.  This only affects responses up until
291   # the point where the request line is parsed, so it mainly decides what
292   # the client gets back when sending a malformed request line.
293   # Most web servers default to HTTP 0.9, i.e. don't send a status line.
294   default_request_version = HTTP_0_9
295
296   # Error message settings
297   error_message_format = DEFAULT_ERROR_MESSAGE
298   error_content_type = DEFAULT_ERROR_CONTENT_TYPE
299
300   responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
301
302   def __init__(self, server, conn, client_addr, fileio_class):
303     """Initializes this class.
304
305     Part of the initialization is reading the request and eventual POST/PUT
306     data sent by the client.
307
308     """
309     self._server = server
310
311     # We default rfile to buffered because otherwise it could be
312     # really slow for large data (a getc() call per byte); we make
313     # wfile unbuffered because (a) often after a write() we want to
314     # read and we need to flush the line; (b) big writes to unbuffered
315     # files are typically optimized by stdio even when big reads
316     # aren't.
317     self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
318     self.wfile = fileio_class(conn, mode="wb", bufsize=0)
319
320     self.client_addr = client_addr
321
322     self.request_headers = None
323     self.request_method = None
324     self.request_path = None
325     self.request_requestline = None
326     self.request_version = self.default_request_version
327
328     self.response_body = None
329     self.response_code = HTTP_OK
330     self.response_content_type = None
331     self.response_headers = {}
332
333     self.should_fork = False
334
335     try:
336       self._ReadRequest()
337       self._ReadPostData()
338     except HTTPException, err:
339       self._SetErrorStatus(err)
340
341   def Close(self):
342     if not self.wfile.closed:
343       self.wfile.flush()
344     self.wfile.close()
345     self.rfile.close()
346
347   def _DateTimeHeader(self):
348     """Return the current date and time formatted for a message header.
349
350     """
351     (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
352     return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
353             (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
354
355   def _SetErrorStatus(self, err):
356     """Sets the response code and body from a HTTPException.
357
358     @type err: HTTPException
359     @param err: Exception instance
360
361     """
362     try:
363       (shortmsg, longmsg) = self.responses[err.code]
364     except KeyError:
365       shortmsg = longmsg = "Unknown"
366
367     if err.message:
368       message = err.message
369     else:
370       message = shortmsg
371
372     values = {
373       "code": err.code,
374       "message": cgi.escape(message),
375       "explain": longmsg,
376       }
377
378     self.response_code = err.code
379     self.response_content_type = self.error_content_type
380     self.response_body = self.error_message_format % values
381
382   def HandleRequest(self):
383     """Handle the actual request.
384
385     Calls the actual handler function and converts exceptions into HTTP errors.
386
387     """
388     # Don't do anything if there's already been a problem
389     if self.response_code != HTTP_OK:
390       return
391
392     assert self.request_method, "Status code %s requires a method" % HTTP_OK
393
394     # Check whether client is still there
395     self.rfile.read(0)
396
397     try:
398       try:
399         result = self._server.HandleRequest(self)
400
401         # TODO: Content-type
402         encoder = HTTPJsonConverter()
403         body = encoder.Encode(result)
404
405         self.response_content_type = encoder.CONTENT_TYPE
406         self.response_body = body
407       except (HTTPException, KeyboardInterrupt, SystemExit):
408         raise
409       except Exception, err:
410         logging.exception("Caught exception")
411         raise HTTPInternalError(message=str(err))
412       except:
413         logging.exception("Unknown exception")
414         raise HTTPInternalError(message="Unknown error")
415
416     except HTTPException, err:
417       self._SetErrorStatus(err)
418
419   def SendResponse(self):
420     """Sends response to the client.
421
422     """
423     # Check whether client is still there
424     self.rfile.read(0)
425
426     logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
427                  self.request_requestline, self.response_code)
428
429     if self.response_code in self.responses:
430       response_message = self.responses[self.response_code][0]
431     else:
432       response_message = ""
433
434     if self.request_version != HTTP_0_9:
435       self.wfile.write("%s %d %s\r\n" %
436                        (self.request_version, self.response_code,
437                         response_message))
438       self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
439       self._SendHeader(HTTP_DATE, self._DateTimeHeader())
440       self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
441       self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
442       for key, val in self.response_headers.iteritems():
443         self._SendHeader(key, val)
444
445       # We don't support keep-alive at this time
446       self._SendHeader(HTTP_CONNECTION, "close")
447       self.wfile.write("\r\n")
448
449     if (self.request_method != HTTP_HEAD and
450         self.response_code >= HTTP_OK and
451         self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
452       self.wfile.write(self.response_body)
453
454   def _SendHeader(self, name, value):
455     if self.request_version != HTTP_0_9:
456       self.wfile.write("%s: %s\r\n" % (name, value))
457
458   def _ReadRequest(self):
459     """Reads and parses request line
460
461     """
462     raw_requestline = self.rfile.readline()
463
464     requestline = raw_requestline
465     if requestline[-2:] == '\r\n':
466       requestline = requestline[:-2]
467     elif requestline[-1:] == '\n':
468       requestline = requestline[:-1]
469
470     if not requestline:
471       raise HTTPBadRequest("Empty request line")
472
473     self.request_requestline = requestline
474
475     logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
476
477     words = requestline.split()
478
479     if len(words) == 3:
480       [method, path, version] = words
481       if version[:5] != 'HTTP/':
482         raise HTTPBadRequest("Bad request version (%r)" % version)
483
484       try:
485         base_version_number = version.split('/', 1)[1]
486         version_number = base_version_number.split(".")
487
488         # RFC 2145 section 3.1 says there can be only one "." and
489         #   - major and minor numbers MUST be treated as
490         #      separate integers;
491         #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
492         #      turn is lower than HTTP/12.3;
493         #   - Leading zeros MUST be ignored by recipients.
494         if len(version_number) != 2:
495           raise HTTPBadRequest("Bad request version (%r)" % version)
496
497         version_number = int(version_number[0]), int(version_number[1])
498       except (ValueError, IndexError):
499         raise HTTPBadRequest("Bad request version (%r)" % version)
500
501       if version_number >= (2, 0):
502         raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
503                                       base_version_number)
504
505     elif len(words) == 2:
506       version = HTTP_0_9
507       [method, path] = words
508       if method != HTTP_GET:
509         raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
510
511     else:
512       raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
513
514     # Examine the headers and look for a Connection directive
515     headers = mimetools.Message(self.rfile, 0)
516
517     self.request_method = method
518     self.request_path = path
519     self.request_version = version
520     self.request_headers = headers
521
522   def _ReadPostData(self):
523     """Reads POST/PUT data
524
525     Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
526     a request is signaled by the inclusion of a Content-Length header field in
527     the request message headers. HTTP/1.0 requests containing an entity body
528     must include a valid Content-Length header field."
529
530     """
531     # While not according to specification, we only support an entity body for
532     # POST and PUT.
533     if (not self.request_method or
534         self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
535       self.request_post_data = None
536       return
537
538     content_length = None
539     try:
540       if HTTP_CONTENT_LENGTH in self.request_headers:
541         content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
542     except TypeError:
543       pass
544     except ValueError:
545       pass
546
547     # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
548     if content_length is None:
549       raise HTTPLengthRequired("Missing Content-Length header or"
550                                " invalid format")
551
552     data = self.rfile.read(content_length)
553
554     # TODO: Content-type, error handling
555     if data:
556       self.request_post_data = HTTPJsonConverter().Decode(data)
557     else:
558       self.request_post_data = None
559
560     logging.debug("HTTP POST data: %s", self.request_post_data)
561
562
563 class HttpServer(_HttpSocketBase):
564   """Generic HTTP server class
565
566   Users of this class must subclass it and override the HandleRequest function.
567
568   """
569   MAX_CHILDREN = 20
570
571   def __init__(self, mainloop, local_address, port,
572                ssl_key_path=None, ssl_cert_path=None, ssl_verify_peer=False):
573     """Initializes the HTTP server
574
575     @type mainloop: ganeti.daemon.Mainloop
576     @param mainloop: Mainloop used to poll for I/O events
577     @type local_addess: string
578     @param local_address: Local IP address to bind to
579     @type port: int
580     @param port: TCP port to listen on
581     @type ssl_key_path: string
582     @param ssl_key_path: Path to file containing SSL key in PEM format
583     @type ssl_cert_path: string
584     @param ssl_cert_path: Path to file containing SSL certificate in PEM format
585     @type ssl_verify_peer: bool
586     @param ssl_verify_peer: Whether to require client certificate and compare
587                             it with our certificate
588
589     """
590     _HttpSocketBase.__init__(self)
591
592     self.mainloop = mainloop
593     self.local_address = local_address
594     self.port = port
595
596     self.socket = self._CreateSocket(ssl_key_path, ssl_cert_path, ssl_verify_peer)
597
598     # Allow port to be reused
599     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
600
601     if self._using_ssl:
602       self._fileio_class = _SSLFileObject
603     else:
604       self._fileio_class = socket._fileobject
605
606     self._children = []
607
608     mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
609     mainloop.RegisterSignal(self)
610
611   def Start(self):
612     self.socket.bind((self.local_address, self.port))
613     self.socket.listen(5)
614
615   def Stop(self):
616     self.socket.close()
617
618   def OnIO(self, fd, condition):
619     if condition & select.POLLIN:
620       self._IncomingConnection()
621
622   def OnSignal(self, signum):
623     if signum == signal.SIGCHLD:
624       self._CollectChildren(True)
625
626   def _CollectChildren(self, quick):
627     """Checks whether any child processes are done
628
629     @type quick: bool
630     @param quick: Whether to only use non-blocking functions
631
632     """
633     if not quick:
634       # Don't wait for other processes if it should be a quick check
635       while len(self._children) > self.MAX_CHILDREN:
636         try:
637           # Waiting without a timeout brings us into a potential DoS situation.
638           # As soon as too many children run, we'll not respond to new
639           # requests. The real solution would be to add a timeout for children
640           # and killing them after some time.
641           pid, status = os.waitpid(0, 0)
642         except os.error:
643           pid = None
644         if pid and pid in self._children:
645           self._children.remove(pid)
646
647     for child in self._children:
648       try:
649         pid, status = os.waitpid(child, os.WNOHANG)
650       except os.error:
651         pid = None
652       if pid and pid in self._children:
653         self._children.remove(pid)
654
655   def _IncomingConnection(self):
656     """Called for each incoming connection
657
658     """
659     (connection, client_addr) = self.socket.accept()
660
661     self._CollectChildren(False)
662
663     pid = os.fork()
664     if pid == 0:
665       # Child process
666       logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
667
668       try:
669         try:
670           try:
671             handler = None
672             try:
673               # Read, parse and handle request
674               handler = _HttpConnectionHandler(self, connection, client_addr,
675                                                self._fileio_class)
676               handler.HandleRequest()
677             finally:
678               # Try to send a response
679               if handler:
680                 handler.SendResponse()
681                 handler.Close()
682           except SocketClosed:
683             pass
684         finally:
685           logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
686       except:
687         logging.exception("Error while handling request from %s:%s",
688                           client_addr[0], client_addr[1])
689         os._exit(1)
690       os._exit(0)
691     else:
692       self._children.append(pid)
693
694   def HandleRequest(self, req):
695     raise NotImplementedError()
696
697
698 class HttpClientRequest(object):
699   def __init__(self, host, port, method, path, headers=None, post_data=None,
700                ssl_key_path=None, ssl_cert_path=None, ssl_verify_peer=False):
701     """Describes an HTTP request.
702
703     @type host: string
704     @param host: Hostname
705     @type port: int
706     @param port: Port
707     @type method: string
708     @param method: Method name
709     @type path: string
710     @param path: Request path
711     @type headers: dict or None
712     @param headers: Additional headers to send
713     @type post_data: string or None
714     @param post_data: Additional data to send
715
716     """
717     if post_data is not None:
718       assert method.upper() in (HTTP_POST, HTTP_PUT), \
719         "Only POST and GET requests support sending data"
720
721     assert path.startswith("/"), "Path must start with slash (/)"
722
723     self.host = host
724     self.port = port
725     self.ssl_key_path = ssl_key_path
726     self.ssl_cert_path = ssl_cert_path
727     self.ssl_verify_peer = ssl_verify_peer
728     self.method = method
729     self.path = path
730     self.headers = headers
731     self.post_data = post_data
732
733     self.success = None
734     self.error = None
735
736     self.resp_status_line = None
737     self.resp_version = None
738     self.resp_status = None
739     self.resp_reason = None
740     self.resp_headers = None
741     self.resp_body = None
742
743
744 class HttpClientRequestExecutor(_HttpSocketBase):
745   # Default headers
746   DEFAULT_HEADERS = {
747     HTTP_USER_AGENT: HTTP_GANETI_VERSION,
748     # TODO: For keep-alive, don't send "Connection: close"
749     HTTP_CONNECTION: "close",
750     }
751
752   # Length limits
753   STATUS_LINE_LENGTH_MAX = 512
754   HEADER_LENGTH_MAX = 4 * 1024
755
756   # Timeouts in seconds for socket layer
757   # TODO: Make read timeout configurable per OpCode
758   CONNECT_TIMEOUT = 5.0
759   WRITE_TIMEOUT = 10
760   READ_TIMEOUT = None
761   CLOSE_TIMEOUT = 1
762
763   # Parser state machine
764   PS_STATUS_LINE = "status-line"
765   PS_HEADERS = "headers"
766   PS_BODY = "body"
767   PS_COMPLETE = "complete"
768
769   # Socket operations
770   (OP_SEND,
771    OP_RECV,
772    OP_CLOSE_CHECK,
773    OP_SHUTDOWN) = range(4)
774
775   def __init__(self, req):
776     """Initializes the HttpClientRequestExecutor class.
777
778     @type req: HttpClientRequest
779     @param req: Request object
780
781     """
782     _HttpSocketBase.__init__(self)
783
784     self.request = req
785
786     self.parser_status = self.PS_STATUS_LINE
787     self.header_buffer = StringIO()
788     self.body_buffer = StringIO()
789     self.content_length = None
790     self.server_will_close = None
791
792     self.poller = select.poll()
793
794     try:
795       # TODO: Implement connection caching/keep-alive
796       self.sock = self._CreateSocket(req.ssl_key_path,
797                                      req.ssl_cert_path,
798                                      req.ssl_verify_peer)
799
800       # Disable Python's timeout
801       self.sock.settimeout(None)
802
803       # Operate in non-blocking mode
804       self.sock.setblocking(0)
805
806       force_close = True
807       self._Connect()
808       try:
809         self._SendRequest()
810         self._ReadResponse()
811
812         # Only wait for server to close if we didn't have any exception.
813         force_close = False
814       finally:
815         self._CloseConnection(force_close)
816
817       self.sock.close()
818       self.sock = None
819
820       req.resp_body = self.body_buffer.getvalue()
821
822       req.success = True
823       req.error = None
824
825     except _HttpClientError, err:
826       req.success = False
827       req.error = str(err)
828
829   def _BuildRequest(self):
830     """Build HTTP request.
831
832     @rtype: string
833     @return: Complete request
834
835     """
836     # Headers
837     send_headers = self.DEFAULT_HEADERS.copy()
838
839     if self.request.headers:
840       send_headers.update(self.request.headers)
841
842     send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
843
844     if self.request.post_data:
845       send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
846
847     buf = StringIO()
848
849     # Add request line. We only support HTTP/1.0 (no chunked transfers and no
850     # keep-alive).
851     # TODO: For keep-alive, change to HTTP/1.1
852     buf.write("%s %s %s\r\n" % (self.request.method.upper(),
853                                 self.request.path, HTTP_1_0))
854
855     # Add headers
856     for name, value in send_headers.iteritems():
857       buf.write("%s: %s\r\n" % (name, value))
858
859     buf.write("\r\n")
860
861     if self.request.post_data:
862       buf.write(self.request.post_data)
863
864     return buf.getvalue()
865
866   def _ParseStatusLine(self):
867     """Parses the status line sent by the server.
868
869     """
870     line = self.request.resp_status_line
871
872     if not line:
873       raise _HttpClientError("Empty status line")
874
875     try:
876       [version, status, reason] = line.split(None, 2)
877     except ValueError:
878       try:
879         [version, status] = line.split(None, 1)
880         reason = ""
881       except ValueError:
882         version = HTTP_9_0
883
884     if version:
885       version = version.upper()
886
887     if version not in (HTTP_1_0, HTTP_1_1):
888       # We do not support HTTP/0.9, despite the specification requiring it
889       # (RFC2616, section 19.6)
890       raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
891                              line)
892
893     # The status code is a three-digit number
894     try:
895       status = int(status)
896       if status < 100 or status > 999:
897         status = -1
898     except ValueError:
899       status = -1
900
901     if status == -1:
902       raise _HttpClientError("Invalid status code (%r)" % line)
903
904     self.request.resp_version = version
905     self.request.resp_status = status
906     self.request.resp_reason = reason
907
908   def _WillServerCloseConnection(self):
909     """Evaluate whether server will close the connection.
910
911     @rtype: bool
912     @return: Whether server will close the connection
913
914     """
915     hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
916     if hdr_connection:
917       hdr_connection = hdr_connection.lower()
918
919     # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
920     if self.request.resp_version == HTTP_1_1:
921       return (hdr_connection and "close" in hdr_connection)
922
923     # Some HTTP/1.0 implementations have support for persistent connections,
924     # using rules different than HTTP/1.1.
925
926     # For older HTTP, Keep-Alive indicates persistent connection.
927     if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
928       return False
929
930     # At least Akamai returns a "Connection: Keep-Alive" header, which was
931     # supposed to be sent by the client.
932     if hdr_connection and "keep-alive" in hdr_connection:
933       return False
934
935     return True
936
937   def _ParseHeaders(self):
938     """Parses the headers sent by the server.
939
940     This function also adjusts internal variables based on the header values.
941
942     """
943     req = self.request
944
945     # Parse headers
946     self.header_buffer.seek(0, 0)
947     req.resp_headers = mimetools.Message(self.header_buffer, 0)
948
949     self.server_will_close = self._WillServerCloseConnection()
950
951     # Do we have a Content-Length header?
952     hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
953     if hdr_content_length:
954       try:
955         self.content_length = int(hdr_content_length)
956       except ValueError:
957         pass
958       if self.content_length is not None and self.content_length < 0:
959         self.content_length = None
960
961     # does the body have a fixed length? (of zero)
962     if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
963         100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
964       self.content_length = 0
965
966     # if the connection remains open and a content-length was not provided,
967     # then assume that the connection WILL close.
968     if self.content_length is None:
969       self.server_will_close = True
970
971   def _CheckStatusLineLength(self, length):
972     if length > self.STATUS_LINE_LENGTH_MAX:
973       raise _HttpClientError("Status line longer than %d chars" %
974                              self.STATUS_LINE_LENGTH_MAX)
975
976   def _CheckHeaderLength(self, length):
977     if length > self.HEADER_LENGTH_MAX:
978       raise _HttpClientError("Headers longer than %d chars" %
979                              self.HEADER_LENGTH_MAX)
980
981   def _ParseBuffer(self, buf, eof):
982     """Main function for HTTP response state machine.
983
984     @type buf: string
985     @param buf: Receive buffer
986     @type eof: bool
987     @param eof: Whether we've reached EOF on the socket
988     @rtype: string
989     @return: Updated receive buffer
990
991     """
992     if self.parser_status == self.PS_STATUS_LINE:
993       # Expect status line
994       idx = buf.find("\r\n")
995       if idx >= 0:
996         self.request.resp_status_line = buf[:idx]
997
998         self._CheckStatusLineLength(len(self.request.resp_status_line))
999
1000         # Remove status line, including CRLF
1001         buf = buf[idx + 2:]
1002
1003         self._ParseStatusLine()
1004
1005         self.parser_status = self.PS_HEADERS
1006       else:
1007         # Check whether incoming data is getting too large, otherwise we just
1008         # fill our read buffer.
1009         self._CheckStatusLineLength(len(buf))
1010
1011     if self.parser_status == self.PS_HEADERS:
1012       # Wait for header end
1013       idx = buf.find("\r\n\r\n")
1014       if idx >= 0:
1015         self.header_buffer.write(buf[:idx + 2])
1016
1017         self._CheckHeaderLength(self.header_buffer.tell())
1018
1019         # Remove headers, including CRLF
1020         buf = buf[idx + 4:]
1021
1022         self._ParseHeaders()
1023
1024         self.parser_status = self.PS_BODY
1025       else:
1026         # Check whether incoming data is getting too large, otherwise we just
1027         # fill our read buffer.
1028         self._CheckHeaderLength(len(buf))
1029
1030     if self.parser_status == self.PS_BODY:
1031       self.body_buffer.write(buf)
1032       buf = ""
1033
1034       # Check whether we've read everything
1035       if (eof or
1036           (self.content_length is not None and
1037            self.body_buffer.tell() >= self.content_length)):
1038         self.parser_status = self.PS_COMPLETE
1039
1040     return buf
1041
1042   def _WaitForCondition(self, event, timeout):
1043     """Waits for a condition to occur on the socket.
1044
1045     @type event: int
1046     @param event: ORed condition (see select module)
1047     @type timeout: float or None
1048     @param timeout: Timeout in seconds
1049     @rtype: int or None
1050     @return: None for timeout, otherwise occured conditions
1051
1052     """
1053     check = (event | select.POLLPRI |
1054              select.POLLNVAL | select.POLLHUP | select.POLLERR)
1055
1056     if timeout is not None:
1057       # Poller object expects milliseconds
1058       timeout *= 1000
1059
1060     self.poller.register(self.sock, event)
1061     try:
1062       while True:
1063         # TODO: If the main thread receives a signal and we have no timeout, we
1064         # could wait forever. This should check a global "quit" flag or
1065         # something every so often.
1066         io_events = self.poller.poll(timeout)
1067         if io_events:
1068           for (evfd, evcond) in io_events:
1069             if evcond & check:
1070               return evcond
1071         else:
1072           # Timeout
1073           return None
1074     finally:
1075       self.poller.unregister(self.sock)
1076
1077   def _SocketOperation(self, op, arg1, error_msg, timeout_msg):
1078     """Wrapper around socket functions.
1079
1080     This function abstracts error handling for socket operations, especially
1081     for the complicated interaction with OpenSSL.
1082
1083     """
1084     if op == self.OP_SEND:
1085       event_poll = select.POLLOUT
1086       event_check = select.POLLOUT
1087       timeout = self.WRITE_TIMEOUT
1088
1089     elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1090       event_poll = select.POLLIN
1091       event_check = select.POLLIN | select.POLLPRI
1092       if op == self.OP_CLOSE_CHECK:
1093         timeout = self.CLOSE_TIMEOUT
1094       else:
1095         timeout = self.READ_TIMEOUT
1096
1097     elif op == self.OP_SHUTDOWN:
1098       event_poll = None
1099       event_check = None
1100
1101       # The timeout is only used when OpenSSL requests polling for a condition.
1102       # It is not advisable to have no timeout for shutdown.
1103       timeout = self.WRITE_TIMEOUT
1104
1105     else:
1106       raise AssertionError("Invalid socket operation")
1107
1108     # No override by default
1109     event_override = 0
1110
1111     while True:
1112       # Poll only for certain operations and when asked for by an override
1113       if (event_override or
1114           op in (self.OP_SEND, self.OP_RECV, self.OP_CLOSE_CHECK)):
1115         if event_override:
1116           wait_for_event = event_override
1117         else:
1118           wait_for_event = event_poll
1119
1120         event = self._WaitForCondition(wait_for_event, timeout)
1121         if event is None:
1122           raise _HttpClientTimeout(timeout_msg)
1123
1124         if (op == self.OP_RECV and
1125             event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
1126           return ""
1127
1128         if not event & wait_for_event:
1129           continue
1130
1131       # Reset override
1132       event_override = 0
1133
1134       try:
1135         try:
1136           if op == self.OP_SEND:
1137             return self.sock.send(arg1)
1138
1139           elif op in (self.OP_RECV, self.OP_CLOSE_CHECK):
1140             return self.sock.recv(arg1)
1141
1142           elif op == self.OP_SHUTDOWN:
1143             if self._using_ssl:
1144               # PyOpenSSL's shutdown() doesn't take arguments
1145               return self.sock.shutdown()
1146             else:
1147               return self.sock.shutdown(arg1)
1148
1149         except OpenSSL.SSL.WantWriteError:
1150           # OpenSSL wants to write, poll for POLLOUT
1151           event_override = select.POLLOUT
1152           continue
1153
1154         except OpenSSL.SSL.WantReadError:
1155           # OpenSSL wants to read, poll for POLLIN
1156           event_override = select.POLLIN | select.POLLPRI
1157           continue
1158
1159         except OpenSSL.SSL.WantX509LookupError:
1160           continue
1161
1162         except OpenSSL.SSL.SysCallError, err:
1163           if op == self.OP_SEND:
1164             # arg1 is the data when writing
1165             if err.args and err.args[0] == -1 and arg1 == "":
1166               # errors when writing empty strings are expected
1167               # and can be ignored
1168               return 0
1169
1170           elif op == self.OP_RECV:
1171             if err.args == (-1, _SSL_UNEXPECTED_EOF):
1172               return ""
1173
1174           raise socket.error(err.args)
1175
1176         except OpenSSL.SSL.Error, err:
1177           raise socket.error(err.args)
1178
1179       except socket.error, err:
1180         if err.args and err.args[0] == errno.EAGAIN:
1181           # Ignore EAGAIN
1182           continue
1183
1184         raise _HttpClientError("%s: %s" % (error_msg, str(err)))
1185
1186   def _Connect(self):
1187     """Non-blocking connect to host with timeout.
1188
1189     """
1190     connected = False
1191     while True:
1192       try:
1193         connect_error = self.sock.connect_ex((self.request.host,
1194                                               self.request.port))
1195       except socket.gaierror, err:
1196         raise _HttpClientError("Connection failed: %s" % str(err))
1197
1198       if connect_error == errno.EINTR:
1199         # Mask signals
1200         pass
1201
1202       elif connect_error == 0:
1203         # Connection established
1204         connected = True
1205         break
1206
1207       elif connect_error == errno.EINPROGRESS:
1208         # Connection started
1209         break
1210
1211       raise _HttpClientError("Connection failed (%s: %s)" %
1212                              (connect_error, os.strerror(connect_error)))
1213
1214     if not connected:
1215       # Wait for connection
1216       event = self._WaitForCondition(select.POLLOUT, self.CONNECT_TIMEOUT)
1217       if event is None:
1218         raise _HttpClientError("Timeout while connecting to server")
1219
1220       # Get error code
1221       connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1222       if connect_error != 0:
1223         raise _HttpClientError("Connection failed (%s: %s)" %
1224                                (connect_error, os.strerror(connect_error)))
1225
1226     # Enable TCP keep-alive
1227     self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1228
1229     # If needed, Linux specific options are available to change the TCP
1230     # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1231     # TCP_KEEPINTVL.
1232
1233   def _SendRequest(self):
1234     """Sends request to server.
1235
1236     """
1237     buf = self._BuildRequest()
1238
1239     while buf:
1240       # Send only 4 KB at a time
1241       data = buf[:4096]
1242
1243       sent = self._SocketOperation(self.OP_SEND, data,
1244                                    "Error while sending request",
1245                                    "Timeout while sending request")
1246
1247       # Remove sent bytes
1248       buf = buf[sent:]
1249
1250     assert not buf, "Request wasn't sent completely"
1251
1252   def _ReadResponse(self):
1253     """Read response from server.
1254
1255     Calls the parser function after reading a chunk of data.
1256
1257     """
1258     buf = ""
1259     eof = False
1260     while self.parser_status != self.PS_COMPLETE:
1261       data = self._SocketOperation(self.OP_RECV, 4096,
1262                                    "Error while reading response",
1263                                    "Timeout while reading response")
1264
1265       if data:
1266         buf += data
1267       else:
1268         eof = True
1269
1270       # Do some parsing and error checking while more data arrives
1271       buf = self._ParseBuffer(buf, eof)
1272
1273       # Must be done only after the buffer has been evaluated
1274       if (eof and
1275           self.parser_status in (self.PS_STATUS_LINE,
1276                                  self.PS_HEADERS)):
1277         raise _HttpClientError("Connection closed prematurely")
1278
1279     # Parse rest
1280     buf = self._ParseBuffer(buf, True)
1281
1282     assert self.parser_status == self.PS_COMPLETE
1283     assert not buf, "Parser didn't read full response"
1284
1285   def _CloseConnection(self, force):
1286     """Closes the connection.
1287
1288     """
1289     if self.server_will_close and not force:
1290       # Wait for server to close
1291       try:
1292         # Check whether it's actually closed
1293         if not self._SocketOperation(self.OP_CLOSE_CHECK, 1,
1294                                      "Error", "Timeout"):
1295           return
1296       except (socket.error, _HttpClientError):
1297         # Ignore errors at this stage
1298         pass
1299
1300     # Close the connection from our side
1301     self._SocketOperation(self.OP_SHUTDOWN, socket.SHUT_RDWR,
1302                           "Error while shutting down connection",
1303                           "Timeout while shutting down connection")
1304
1305
1306 class _HttpClientPendingRequest(object):
1307   """Data class for pending requests.
1308
1309   """
1310   def __init__(self, request):
1311     self.request = request
1312
1313     # Thread synchronization
1314     self.done = threading.Event()
1315
1316
1317 class HttpClientWorker(workerpool.BaseWorker):
1318   """HTTP client worker class.
1319
1320   """
1321   def RunTask(self, pend_req):
1322     try:
1323       HttpClientRequestExecutor(pend_req.request)
1324     finally:
1325       pend_req.done.set()
1326
1327
1328 class HttpClientWorkerPool(workerpool.WorkerPool):
1329   def __init__(self, manager):
1330     workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1331                                    HttpClientWorker)
1332     self.manager = manager
1333
1334
1335 class HttpClientManager(object):
1336   """Manages HTTP requests.
1337
1338   """
1339   def __init__(self):
1340     self._wpool = HttpClientWorkerPool(self)
1341
1342   def __del__(self):
1343     self.Shutdown()
1344
1345   def ExecRequests(self, requests):
1346     """Execute HTTP requests.
1347
1348     This function can be called from multiple threads at the same time.
1349
1350     @type requests: List of HttpClientRequest instances
1351     @param requests: The requests to execute
1352     @rtype: List of HttpClientRequest instances
1353     @returns: The list of requests passed in
1354
1355     """
1356     # _HttpClientPendingRequest is used for internal thread synchronization
1357     pending = [_HttpClientPendingRequest(req) for req in requests]
1358
1359     try:
1360       # Add requests to queue
1361       for pend_req in pending:
1362         self._wpool.AddTask(pend_req)
1363
1364     finally:
1365       # In case of an exception we should still wait for the rest, otherwise
1366       # another thread from the worker pool could modify the request object
1367       # after we returned.
1368
1369       # And wait for them to finish
1370       for pend_req in pending:
1371         pend_req.done.wait()
1372
1373     # Return original list
1374     return requests
1375
1376   def Shutdown(self):
1377     self._wpool.Quiesce()
1378     self._wpool.TerminateWorkers()
1379
1380
1381 class _SSLFileObject(object):
1382   """Wrapper around socket._fileobject
1383
1384   This wrapper is required to handle OpenSSL exceptions.
1385
1386   """
1387   def _RequireOpenSocket(fn):
1388     def wrapper(self, *args, **kwargs):
1389       if self.closed:
1390         raise SocketClosed("Socket is closed")
1391       return fn(self, *args, **kwargs)
1392     return wrapper
1393
1394   def __init__(self, sock, mode='rb', bufsize=-1):
1395     self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1396
1397   def _ConnectionLost(self):
1398     self._base = None
1399
1400   def _getclosed(self):
1401     return self._base is None or self._base.closed
1402   closed = property(_getclosed, doc="True if the file is closed")
1403
1404   @_RequireOpenSocket
1405   def close(self):
1406     return self._base.close()
1407
1408   @_RequireOpenSocket
1409   def flush(self):
1410     return self._base.flush()
1411
1412   @_RequireOpenSocket
1413   def fileno(self):
1414     return self._base.fileno()
1415
1416   @_RequireOpenSocket
1417   def read(self, size=-1):
1418     return self._ReadWrapper(self._base.read, size=size)
1419
1420   @_RequireOpenSocket
1421   def readline(self, size=-1):
1422     return self._ReadWrapper(self._base.readline, size=size)
1423
1424   def _ReadWrapper(self, fn, *args, **kwargs):
1425     while True:
1426       try:
1427         return fn(*args, **kwargs)
1428
1429       except OpenSSL.SSL.ZeroReturnError, err:
1430         self._ConnectionLost()
1431         return ""
1432
1433       except OpenSSL.SSL.WantReadError:
1434         continue
1435
1436       #except OpenSSL.SSL.WantWriteError:
1437       # TODO
1438
1439       except OpenSSL.SSL.SysCallError, (retval, desc):
1440         if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1441             or retval > 0):
1442           self._ConnectionLost()
1443           return ""
1444
1445         logging.exception("Error in OpenSSL")
1446         self._ConnectionLost()
1447         raise socket.error(err.args)
1448
1449       except OpenSSL.SSL.Error, err:
1450         self._ConnectionLost()
1451         raise socket.error(err.args)
1452
1453   @_RequireOpenSocket
1454   def write(self, data):
1455     return self._WriteWrapper(self._base.write, data)
1456
1457   def _WriteWrapper(self, fn, *args, **kwargs):
1458     while True:
1459       try:
1460         return fn(*args, **kwargs)
1461       except OpenSSL.SSL.ZeroReturnError, err:
1462         self._ConnectionLost()
1463         return 0
1464
1465       except OpenSSL.SSL.WantWriteError:
1466         continue
1467
1468       #except OpenSSL.SSL.WantReadError:
1469       # TODO
1470
1471       except OpenSSL.SSL.SysCallError, err:
1472         if err.args[0] == -1 and data == "":
1473           # errors when writing empty strings are expected
1474           # and can be ignored
1475           return 0
1476
1477         self._ConnectionLost()
1478         raise socket.error(err.args)
1479
1480       except OpenSSL.SSL.Error, err:
1481         self._ConnectionLost()
1482         raise socket.error(err.args)