ganeti.http: Remove unused attribute "should_fork"
[ganeti-local] / lib / http.py
1 #
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16 # 02110-1301, USA.
17
18 """HTTP server module.
19
20 """
21
22 import BaseHTTPServer
23 import cgi
24 import logging
25 import mimetools
26 import OpenSSL
27 import os
28 import select
29 import socket
30 import sys
31 import time
32 import signal
33 import logging
34 import errno
35 import threading
36
37 from cStringIO import StringIO
38
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import workerpool
42 from ganeti import utils
43
44
45 HTTP_CLIENT_THREADS = 10
46
47 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
48
49 WEEKDAYNAME = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
50 MONTHNAME = [None,
51              'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
52              'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
53
54 # Default error message
55 DEFAULT_ERROR_CONTENT_TYPE = "text/html"
56 DEFAULT_ERROR_MESSAGE = """\
57 <head>
58 <title>Error response</title>
59 </head>
60 <body>
61 <h1>Error response</h1>
62 <p>Error code %(code)d.
63 <p>Message: %(message)s.
64 <p>Error code explanation: %(code)s = %(explain)s.
65 </body>
66 """
67
68 HTTP_OK = 200
69 HTTP_NO_CONTENT = 204
70 HTTP_NOT_MODIFIED = 304
71
72 HTTP_0_9 = "HTTP/0.9"
73 HTTP_1_0 = "HTTP/1.0"
74 HTTP_1_1 = "HTTP/1.1"
75
76 HTTP_GET = "GET"
77 HTTP_HEAD = "HEAD"
78 HTTP_POST = "POST"
79 HTTP_PUT = "PUT"
80
81 HTTP_ETAG = "ETag"
82 HTTP_HOST = "Host"
83 HTTP_SERVER = "Server"
84 HTTP_DATE = "Date"
85 HTTP_USER_AGENT = "User-Agent"
86 HTTP_CONTENT_TYPE = "Content-Type"
87 HTTP_CONTENT_LENGTH = "Content-Length"
88 HTTP_CONNECTION = "Connection"
89 HTTP_KEEP_ALIVE = "Keep-Alive"
90
91 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
92
93 # Socket operations
94 (SOCKOP_SEND,
95  SOCKOP_RECV,
96  SOCKOP_SHUTDOWN) = range(3)
97
98
99 class SocketClosed(socket.error):
100   pass
101
102
103 class _HttpClientError(Exception):
104   """Internal exception for HTTP client errors.
105
106   This should only be used for internal error reporting.
107
108   """
109
110
111 class _HttpSocketTimeout(Exception):
112   """Internal exception for socket timeouts.
113
114   This should only be used for internal error reporting.
115
116   """
117
118
119 class HTTPException(Exception):
120   code = None
121   message = None
122
123   def __init__(self, message=None):
124     Exception.__init__(self)
125     if message is not None:
126       self.message = message
127
128
129 class HTTPBadRequest(HTTPException):
130   code = 400
131
132
133 class HTTPForbidden(HTTPException):
134   code = 403
135
136
137 class HTTPNotFound(HTTPException):
138   code = 404
139
140
141 class HTTPGone(HTTPException):
142   code = 410
143
144
145 class HTTPLengthRequired(HTTPException):
146   code = 411
147
148
149 class HTTPInternalError(HTTPException):
150   code = 500
151
152
153 class HTTPNotImplemented(HTTPException):
154   code = 501
155
156
157 class HTTPServiceUnavailable(HTTPException):
158   code = 503
159
160
161 class HTTPVersionNotSupported(HTTPException):
162   code = 505
163
164
165 class HTTPJsonConverter:
166   CONTENT_TYPE = "application/json"
167
168   def Encode(self, data):
169     return serializer.DumpJson(data)
170
171   def Decode(self, data):
172     return serializer.LoadJson(data)
173
174
175 def WaitForSocketCondition(poller, sock, event, timeout):
176   """Waits for a condition to occur on the socket.
177
178   @type poller: select.Poller
179   @param poller: Poller object as created by select.poll()
180   @type sock: socket
181   @param socket: Wait for events on this socket
182   @type event: int
183   @param event: ORed condition (see select module)
184   @type timeout: float or None
185   @param timeout: Timeout in seconds
186   @rtype: int or None
187   @return: None for timeout, otherwise occured conditions
188
189   """
190   check = (event | select.POLLPRI |
191            select.POLLNVAL | select.POLLHUP | select.POLLERR)
192
193   if timeout is not None:
194     # Poller object expects milliseconds
195     timeout *= 1000
196
197   poller.register(sock, event)
198   try:
199     while True:
200       # TODO: If the main thread receives a signal and we have no timeout, we
201       # could wait forever. This should check a global "quit" flag or
202       # something every so often.
203       io_events = poller.poll(timeout)
204       if not io_events:
205         # Timeout
206         return None
207       for (evfd, evcond) in io_events:
208         if evcond & check:
209           return evcond
210   finally:
211     poller.unregister(sock)
212
213
214 def SocketOperation(poller, sock, op, arg1, timeout):
215   """Wrapper around socket functions.
216
217   This function abstracts error handling for socket operations, especially
218   for the complicated interaction with OpenSSL.
219
220   @type poller: select.Poller
221   @param poller: Poller object as created by select.poll()
222   @type sock: socket
223   @param socket: Socket for the operation
224   @type op: int
225   @param op: Operation to execute (SOCKOP_* constants)
226   @type arg1: any
227   @param arg1: Parameter for function (if needed)
228   @type timeout: None or float
229   @param timeout: Timeout in seconds or None
230
231   """
232   # TODO: event_poll/event_check/override
233   if op == SOCKOP_SEND:
234     event_poll = select.POLLOUT
235     event_check = select.POLLOUT
236
237   elif op == SOCKOP_RECV:
238     event_poll = select.POLLIN
239     event_check = select.POLLIN | select.POLLPRI
240
241   elif op == SOCKOP_SHUTDOWN:
242     event_poll = None
243     event_check = None
244
245     # The timeout is only used when OpenSSL requests polling for a condition.
246     # It is not advisable to have no timeout for shutdown.
247     assert timeout
248
249   else:
250     raise AssertionError("Invalid socket operation")
251
252   # No override by default
253   event_override = 0
254
255   while True:
256     # Poll only for certain operations and when asked for by an override
257     if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
258       if event_override:
259         wait_for_event = event_override
260       else:
261         wait_for_event = event_poll
262
263       event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
264       if event is None:
265         raise _HttpSocketTimeout()
266
267       if (op == SOCKOP_RECV and
268           event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
269         return ""
270
271       if not event & wait_for_event:
272         continue
273
274     # Reset override
275     event_override = 0
276
277     try:
278       try:
279         if op == SOCKOP_SEND:
280           return sock.send(arg1)
281
282         elif op == SOCKOP_RECV:
283           return sock.recv(arg1)
284
285         elif op == SOCKOP_SHUTDOWN:
286           if isinstance(sock, OpenSSL.SSL.ConnectionType):
287             # PyOpenSSL's shutdown() doesn't take arguments
288             return sock.shutdown()
289           else:
290             return sock.shutdown(arg1)
291
292       except OpenSSL.SSL.WantWriteError:
293         # OpenSSL wants to write, poll for POLLOUT
294         event_override = select.POLLOUT
295         continue
296
297       except OpenSSL.SSL.WantReadError:
298         # OpenSSL wants to read, poll for POLLIN
299         event_override = select.POLLIN | select.POLLPRI
300         continue
301
302       except OpenSSL.SSL.WantX509LookupError:
303         continue
304
305       except OpenSSL.SSL.SysCallError, err:
306         if op == SOCKOP_SEND:
307           # arg1 is the data when writing
308           if err.args and err.args[0] == -1 and arg1 == "":
309             # errors when writing empty strings are expected
310             # and can be ignored
311             return 0
312
313         elif op == SOCKOP_RECV:
314           if err.args == (-1, _SSL_UNEXPECTED_EOF):
315             return ""
316
317         raise socket.error(err.args)
318
319       except OpenSSL.SSL.Error, err:
320         raise socket.error(err.args)
321
322     except socket.error, err:
323       if err.args and err.args[0] == errno.EAGAIN:
324         # Ignore EAGAIN
325         continue
326
327       raise
328
329
330 class HttpSslParams(object):
331   """Data class for SSL key and certificate.
332
333   """
334   def __init__(self, ssl_key_path, ssl_cert_path):
335     """Initializes this class.
336
337     @type ssl_key_path: string
338     @param ssl_key_path: Path to file containing SSL key in PEM format
339     @type ssl_cert_path: string
340     @param ssl_cert_path: Path to file containing SSL certificate in PEM format
341
342     """
343     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
344     self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
345
346   def GetKey(self):
347     return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
348                                           self.ssl_key_pem)
349
350   def GetCertificate(self):
351     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
352                                            self.ssl_cert_pem)
353
354
355 class _HttpSocketBase(object):
356   """Base class for HTTP server and client.
357
358   """
359   def __init__(self):
360     self._using_ssl = None
361     self._ssl_params = None
362     self._ssl_key = None
363     self._ssl_cert = None
364
365   def _CreateSocket(self, ssl_params, ssl_verify_peer):
366     """Creates a TCP socket and initializes SSL if needed.
367
368     @type ssl_params: HttpSslParams
369     @param ssl_params: SSL key and certificate
370     @type ssl_verify_peer: bool
371     @param ssl_verify_peer: Whether to require client certificate and compare
372                             it with our certificate
373
374     """
375     self._ssl_params = ssl_params
376
377     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
378
379     # Should we enable SSL?
380     self._using_ssl = ssl_params is not None
381
382     if not self._using_ssl:
383       return sock
384
385     self._ssl_key = ssl_params.GetKey()
386     self._ssl_cert = ssl_params.GetCertificate()
387
388     ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
389     ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
390
391     ctx.use_privatekey(self._ssl_key)
392     ctx.use_certificate(self._ssl_cert)
393     ctx.check_privatekey()
394
395     if ssl_verify_peer:
396       ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
397                      OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
398                      self._SSLVerifyCallback)
399
400     return OpenSSL.SSL.Connection(ctx, sock)
401
402   def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
403     """Verify the certificate provided by the peer
404
405     We only compare fingerprints. The client must use the same certificate as
406     we do on our side.
407
408     """
409     assert self._ssl_params, "SSL not initialized"
410
411     return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
412             self._ssl_cert.digest("md5") == cert.digest("md5"))
413
414
415 class HttpServerRequestExecutor(object):
416   """Implements server side of HTTP
417
418   This class implements the server side of HTTP. It's based on code of Python's
419   BaseHTTPServer, from both version 2.4 and 3k. It does not support non-ASCII
420   character encodings. Keep-alive connections are not supported.
421
422   """
423   # The default request version.  This only affects responses up until
424   # the point where the request line is parsed, so it mainly decides what
425   # the client gets back when sending a malformed request line.
426   # Most web servers default to HTTP 0.9, i.e. don't send a status line.
427   default_request_version = HTTP_0_9
428
429   # Error message settings
430   error_message_format = DEFAULT_ERROR_MESSAGE
431   error_content_type = DEFAULT_ERROR_CONTENT_TYPE
432
433   responses = BaseHTTPServer.BaseHTTPRequestHandler.responses
434
435   def __init__(self, server, conn, client_addr, fileio_class):
436     """Initializes this class.
437
438     Part of the initialization is reading the request and eventual POST/PUT
439     data sent by the client.
440
441     """
442     self._server = server
443
444     # We default rfile to buffered because otherwise it could be
445     # really slow for large data (a getc() call per byte); we make
446     # wfile unbuffered because (a) often after a write() we want to
447     # read and we need to flush the line; (b) big writes to unbuffered
448     # files are typically optimized by stdio even when big reads
449     # aren't.
450     self.rfile = fileio_class(conn, mode="rb", bufsize=-1)
451     self.wfile = fileio_class(conn, mode="wb", bufsize=0)
452
453     self.client_addr = client_addr
454
455     self.request_headers = None
456     self.request_method = None
457     self.request_path = None
458     self.request_requestline = None
459     self.request_version = self.default_request_version
460
461     self.response_body = None
462     self.response_code = HTTP_OK
463     self.response_content_type = None
464     self.response_headers = {}
465
466     logging.info("Connection from %s:%s", client_addr[0], client_addr[1])
467     try:
468       try:
469         try:
470           try:
471             # Read, parse and handle request
472             self._ReadRequest()
473             self._ReadPostData()
474             self._HandleRequest()
475           except HTTPException, err:
476             self._SetErrorStatus(err)
477         finally:
478           # Try to send a response
479           self._SendResponse()
480           self._Close()
481       except SocketClosed:
482         pass
483     finally:
484       logging.info("Disconnected %s:%s", client_addr[0], client_addr[1])
485
486   def _Close(self):
487     if not self.wfile.closed:
488       self.wfile.flush()
489     self.wfile.close()
490     self.rfile.close()
491
492   def _DateTimeHeader(self):
493     """Return the current date and time formatted for a message header.
494
495     """
496     (year, month, day, hh, mm, ss, wd, _, _) = time.gmtime()
497     return ("%s, %02d %3s %4d %02d:%02d:%02d GMT" %
498             (WEEKDAYNAME[wd], day, MONTHNAME[month], year, hh, mm, ss))
499
500   def _SetErrorStatus(self, err):
501     """Sets the response code and body from a HTTPException.
502
503     @type err: HTTPException
504     @param err: Exception instance
505
506     """
507     try:
508       (shortmsg, longmsg) = self.responses[err.code]
509     except KeyError:
510       shortmsg = longmsg = "Unknown"
511
512     if err.message:
513       message = err.message
514     else:
515       message = shortmsg
516
517     values = {
518       "code": err.code,
519       "message": cgi.escape(message),
520       "explain": longmsg,
521       }
522
523     self.response_code = err.code
524     self.response_content_type = self.error_content_type
525     self.response_body = self.error_message_format % values
526
527   def _HandleRequest(self):
528     """Handle the actual request.
529
530     Calls the actual handler function and converts exceptions into HTTP errors.
531
532     """
533     # Don't do anything if there's already been a problem
534     if self.response_code != HTTP_OK:
535       return
536
537     assert self.request_method, "Status code %s requires a method" % HTTP_OK
538
539     # Check whether client is still there
540     self.rfile.read(0)
541
542     try:
543       try:
544         result = self._server.HandleRequest(self)
545
546         # TODO: Content-type
547         encoder = HTTPJsonConverter()
548         body = encoder.Encode(result)
549
550         self.response_content_type = encoder.CONTENT_TYPE
551         self.response_body = body
552       except (HTTPException, KeyboardInterrupt, SystemExit):
553         raise
554       except Exception, err:
555         logging.exception("Caught exception")
556         raise HTTPInternalError(message=str(err))
557       except:
558         logging.exception("Unknown exception")
559         raise HTTPInternalError(message="Unknown error")
560
561     except HTTPException, err:
562       self._SetErrorStatus(err)
563
564   def _SendResponse(self):
565     """Sends response to the client.
566
567     """
568     # Check whether client is still there
569     self.rfile.read(0)
570
571     logging.info("%s:%s %s %s", self.client_addr[0], self.client_addr[1],
572                  self.request_requestline, self.response_code)
573
574     if self.response_code in self.responses:
575       response_message = self.responses[self.response_code][0]
576     else:
577       response_message = ""
578
579     if self.request_version != HTTP_0_9:
580       self.wfile.write("%s %d %s\r\n" %
581                        (self.request_version, self.response_code,
582                         response_message))
583       self._SendHeader(HTTP_SERVER, HTTP_GANETI_VERSION)
584       self._SendHeader(HTTP_DATE, self._DateTimeHeader())
585       self._SendHeader(HTTP_CONTENT_TYPE, self.response_content_type)
586       self._SendHeader(HTTP_CONTENT_LENGTH, str(len(self.response_body)))
587       for key, val in self.response_headers.iteritems():
588         self._SendHeader(key, val)
589
590       # We don't support keep-alive at this time
591       self._SendHeader(HTTP_CONNECTION, "close")
592       self.wfile.write("\r\n")
593
594     if (self.request_method != HTTP_HEAD and
595         self.response_code >= HTTP_OK and
596         self.response_code not in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED)):
597       self.wfile.write(self.response_body)
598
599   def _SendHeader(self, name, value):
600     if self.request_version != HTTP_0_9:
601       self.wfile.write("%s: %s\r\n" % (name, value))
602
603   def _ReadRequest(self):
604     """Reads and parses request line
605
606     """
607     raw_requestline = self.rfile.readline()
608
609     requestline = raw_requestline
610     if requestline[-2:] == '\r\n':
611       requestline = requestline[:-2]
612     elif requestline[-1:] == '\n':
613       requestline = requestline[:-1]
614
615     if not requestline:
616       raise HTTPBadRequest("Empty request line")
617
618     self.request_requestline = requestline
619
620     logging.debug("HTTP request: %s", raw_requestline.rstrip("\r\n"))
621
622     words = requestline.split()
623
624     if len(words) == 3:
625       [method, path, version] = words
626       if version[:5] != 'HTTP/':
627         raise HTTPBadRequest("Bad request version (%r)" % version)
628
629       try:
630         base_version_number = version.split('/', 1)[1]
631         version_number = base_version_number.split(".")
632
633         # RFC 2145 section 3.1 says there can be only one "." and
634         #   - major and minor numbers MUST be treated as
635         #      separate integers;
636         #   - HTTP/2.4 is a lower version than HTTP/2.13, which in
637         #      turn is lower than HTTP/12.3;
638         #   - Leading zeros MUST be ignored by recipients.
639         if len(version_number) != 2:
640           raise HTTPBadRequest("Bad request version (%r)" % version)
641
642         version_number = int(version_number[0]), int(version_number[1])
643       except (ValueError, IndexError):
644         raise HTTPBadRequest("Bad request version (%r)" % version)
645
646       if version_number >= (2, 0):
647         raise HTTPVersionNotSupported("Invalid HTTP Version (%s)" %
648                                       base_version_number)
649
650     elif len(words) == 2:
651       version = HTTP_0_9
652       [method, path] = words
653       if method != HTTP_GET:
654         raise HTTPBadRequest("Bad HTTP/0.9 request type (%r)" % method)
655
656     else:
657       raise HTTPBadRequest("Bad request syntax (%r)" % requestline)
658
659     # Examine the headers and look for a Connection directive
660     headers = mimetools.Message(self.rfile, 0)
661
662     self.request_method = method
663     self.request_path = path
664     self.request_version = version
665     self.request_headers = headers
666
667   def _ReadPostData(self):
668     """Reads POST/PUT data
669
670     Quoting RFC1945, section 7.2 (HTTP/1.0): "The presence of an entity body in
671     a request is signaled by the inclusion of a Content-Length header field in
672     the request message headers. HTTP/1.0 requests containing an entity body
673     must include a valid Content-Length header field."
674
675     """
676     # While not according to specification, we only support an entity body for
677     # POST and PUT.
678     if (not self.request_method or
679         self.request_method.upper() not in (HTTP_POST, HTTP_PUT)):
680       self.request_post_data = None
681       return
682
683     content_length = None
684     try:
685       if HTTP_CONTENT_LENGTH in self.request_headers:
686         content_length = int(self.request_headers[HTTP_CONTENT_LENGTH])
687     except TypeError:
688       pass
689     except ValueError:
690       pass
691
692     # 411 Length Required is specified in RFC2616, section 10.4.12 (HTTP/1.1)
693     if content_length is None:
694       raise HTTPLengthRequired("Missing Content-Length header or"
695                                " invalid format")
696
697     data = self.rfile.read(content_length)
698
699     # TODO: Content-type, error handling
700     if data:
701       self.request_post_data = HTTPJsonConverter().Decode(data)
702     else:
703       self.request_post_data = None
704
705     logging.debug("HTTP POST data: %s", self.request_post_data)
706
707
708 class HttpServer(_HttpSocketBase):
709   """Generic HTTP server class
710
711   Users of this class must subclass it and override the HandleRequest function.
712
713   """
714   MAX_CHILDREN = 20
715
716   def __init__(self, mainloop, local_address, port,
717                ssl_params=None, ssl_verify_peer=False):
718     """Initializes the HTTP server
719
720     @type mainloop: ganeti.daemon.Mainloop
721     @param mainloop: Mainloop used to poll for I/O events
722     @type local_addess: string
723     @param local_address: Local IP address to bind to
724     @type port: int
725     @param port: TCP port to listen on
726     @type ssl_params: HttpSslParams
727     @param ssl_params: SSL key and certificate
728     @type ssl_verify_peer: bool
729     @param ssl_verify_peer: Whether to require client certificate and compare
730                             it with our certificate
731
732     """
733     _HttpSocketBase.__init__(self)
734
735     self.mainloop = mainloop
736     self.local_address = local_address
737     self.port = port
738
739     self.socket = self._CreateSocket(ssl_params, ssl_verify_peer)
740
741     # Allow port to be reused
742     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
743
744     if self._using_ssl:
745       self._fileio_class = _SSLFileObject
746     else:
747       self._fileio_class = socket._fileobject
748
749     self._children = []
750
751     mainloop.RegisterIO(self, self.socket.fileno(), select.POLLIN)
752     mainloop.RegisterSignal(self)
753
754   def Start(self):
755     self.socket.bind((self.local_address, self.port))
756     self.socket.listen(5)
757
758   def Stop(self):
759     self.socket.close()
760
761   def OnIO(self, fd, condition):
762     if condition & select.POLLIN:
763       self._IncomingConnection()
764
765   def OnSignal(self, signum):
766     if signum == signal.SIGCHLD:
767       self._CollectChildren(True)
768
769   def _CollectChildren(self, quick):
770     """Checks whether any child processes are done
771
772     @type quick: bool
773     @param quick: Whether to only use non-blocking functions
774
775     """
776     if not quick:
777       # Don't wait for other processes if it should be a quick check
778       while len(self._children) > self.MAX_CHILDREN:
779         try:
780           # Waiting without a timeout brings us into a potential DoS situation.
781           # As soon as too many children run, we'll not respond to new
782           # requests. The real solution would be to add a timeout for children
783           # and killing them after some time.
784           pid, status = os.waitpid(0, 0)
785         except os.error:
786           pid = None
787         if pid and pid in self._children:
788           self._children.remove(pid)
789
790     for child in self._children:
791       try:
792         pid, status = os.waitpid(child, os.WNOHANG)
793       except os.error:
794         pid = None
795       if pid and pid in self._children:
796         self._children.remove(pid)
797
798   def _IncomingConnection(self):
799     """Called for each incoming connection
800
801     """
802     (connection, client_addr) = self.socket.accept()
803
804     self._CollectChildren(False)
805
806     pid = os.fork()
807     if pid == 0:
808       # Child process
809       try:
810         HttpServerRequestExecutor(self, connection, client_addr,
811                                   self._fileio_class)
812       except:
813         logging.exception("Error while handling request from %s:%s",
814                           client_addr[0], client_addr[1])
815         os._exit(1)
816       os._exit(0)
817     else:
818       self._children.append(pid)
819
820   def HandleRequest(self, req):
821     raise NotImplementedError()
822
823
824 class HttpClientRequest(object):
825   def __init__(self, host, port, method, path, headers=None, post_data=None,
826                ssl_params=None, ssl_verify_peer=False):
827     """Describes an HTTP request.
828
829     @type host: string
830     @param host: Hostname
831     @type port: int
832     @param port: Port
833     @type method: string
834     @param method: Method name
835     @type path: string
836     @param path: Request path
837     @type headers: dict or None
838     @param headers: Additional headers to send
839     @type post_data: string or None
840     @param post_data: Additional data to send
841     @type ssl_params: HttpSslParams
842     @param ssl_params: SSL key and certificate
843     @type ssl_verify_peer: bool
844     @param ssl_verify_peer: Whether to compare our certificate with server's
845                             certificate
846
847     """
848     if post_data is not None:
849       assert method.upper() in (HTTP_POST, HTTP_PUT), \
850         "Only POST and GET requests support sending data"
851
852     assert path.startswith("/"), "Path must start with slash (/)"
853
854     self.host = host
855     self.port = port
856     self.ssl_params = ssl_params
857     self.ssl_verify_peer = ssl_verify_peer
858     self.method = method
859     self.path = path
860     self.headers = headers
861     self.post_data = post_data
862
863     self.success = None
864     self.error = None
865
866     self.resp_status_line = None
867     self.resp_version = None
868     self.resp_status = None
869     self.resp_reason = None
870     self.resp_headers = None
871     self.resp_body = None
872
873
874 class HttpClientRequestExecutor(_HttpSocketBase):
875   # Default headers
876   DEFAULT_HEADERS = {
877     HTTP_USER_AGENT: HTTP_GANETI_VERSION,
878     # TODO: For keep-alive, don't send "Connection: close"
879     HTTP_CONNECTION: "close",
880     }
881
882   # Length limits
883   STATUS_LINE_LENGTH_MAX = 512
884   HEADER_LENGTH_MAX = 4 * 1024
885
886   # Timeouts in seconds for socket layer
887   # TODO: Make read timeout configurable per OpCode
888   CONNECT_TIMEOUT = 5.0
889   WRITE_TIMEOUT = 10
890   READ_TIMEOUT = None
891   CLOSE_TIMEOUT = 1
892
893   # Parser state machine
894   PS_STATUS_LINE = "status-line"
895   PS_HEADERS = "headers"
896   PS_BODY = "body"
897   PS_COMPLETE = "complete"
898
899   def __init__(self, req):
900     """Initializes the HttpClientRequestExecutor class.
901
902     @type req: HttpClientRequest
903     @param req: Request object
904
905     """
906     _HttpSocketBase.__init__(self)
907
908     self.request = req
909
910     self.parser_status = self.PS_STATUS_LINE
911     self.header_buffer = StringIO()
912     self.body_buffer = StringIO()
913     self.content_length = None
914     self.server_will_close = None
915
916     self.poller = select.poll()
917
918     try:
919       # TODO: Implement connection caching/keep-alive
920       self.sock = self._CreateSocket(req.ssl_params,
921                                      req.ssl_verify_peer)
922
923       # Disable Python's timeout
924       self.sock.settimeout(None)
925
926       # Operate in non-blocking mode
927       self.sock.setblocking(0)
928
929       force_close = True
930       self._Connect()
931       try:
932         self._SendRequest()
933         self._ReadResponse()
934
935         # Only wait for server to close if we didn't have any exception.
936         force_close = False
937       finally:
938         self._CloseConnection(force_close)
939
940       self.sock.close()
941       self.sock = None
942
943       req.resp_body = self.body_buffer.getvalue()
944
945       req.success = True
946       req.error = None
947
948     except _HttpClientError, err:
949       req.success = False
950       req.error = str(err)
951
952   def _BuildRequest(self):
953     """Build HTTP request.
954
955     @rtype: string
956     @return: Complete request
957
958     """
959     # Headers
960     send_headers = self.DEFAULT_HEADERS.copy()
961
962     if self.request.headers:
963       send_headers.update(self.request.headers)
964
965     send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
966
967     if self.request.post_data:
968       send_headers[HTTP_CONTENT_LENGTH] = len(self.request.post_data)
969
970     buf = StringIO()
971
972     # Add request line. We only support HTTP/1.0 (no chunked transfers and no
973     # keep-alive).
974     # TODO: For keep-alive, change to HTTP/1.1
975     buf.write("%s %s %s\r\n" % (self.request.method.upper(),
976                                 self.request.path, HTTP_1_0))
977
978     # Add headers
979     for name, value in send_headers.iteritems():
980       buf.write("%s: %s\r\n" % (name, value))
981
982     buf.write("\r\n")
983
984     if self.request.post_data:
985       buf.write(self.request.post_data)
986
987     return buf.getvalue()
988
989   def _ParseStatusLine(self):
990     """Parses the status line sent by the server.
991
992     """
993     line = self.request.resp_status_line
994
995     if not line:
996       raise _HttpClientError("Empty status line")
997
998     try:
999       [version, status, reason] = line.split(None, 2)
1000     except ValueError:
1001       try:
1002         [version, status] = line.split(None, 1)
1003         reason = ""
1004       except ValueError:
1005         version = HTTP_9_0
1006
1007     if version:
1008       version = version.upper()
1009
1010     if version not in (HTTP_1_0, HTTP_1_1):
1011       # We do not support HTTP/0.9, despite the specification requiring it
1012       # (RFC2616, section 19.6)
1013       raise _HttpClientError("Only HTTP/1.0 and HTTP/1.1 are supported (%r)" %
1014                              line)
1015
1016     # The status code is a three-digit number
1017     try:
1018       status = int(status)
1019       if status < 100 or status > 999:
1020         status = -1
1021     except ValueError:
1022       status = -1
1023
1024     if status == -1:
1025       raise _HttpClientError("Invalid status code (%r)" % line)
1026
1027     self.request.resp_version = version
1028     self.request.resp_status = status
1029     self.request.resp_reason = reason
1030
1031   def _WillServerCloseConnection(self):
1032     """Evaluate whether server will close the connection.
1033
1034     @rtype: bool
1035     @return: Whether server will close the connection
1036
1037     """
1038     hdr_connection = self.request.resp_headers.get(HTTP_CONNECTION, None)
1039     if hdr_connection:
1040       hdr_connection = hdr_connection.lower()
1041
1042     # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
1043     if self.request.resp_version == HTTP_1_1:
1044       return (hdr_connection and "close" in hdr_connection)
1045
1046     # Some HTTP/1.0 implementations have support for persistent connections,
1047     # using rules different than HTTP/1.1.
1048
1049     # For older HTTP, Keep-Alive indicates persistent connection.
1050     if self.request.resp_headers.get(HTTP_KEEP_ALIVE):
1051       return False
1052
1053     # At least Akamai returns a "Connection: Keep-Alive" header, which was
1054     # supposed to be sent by the client.
1055     if hdr_connection and "keep-alive" in hdr_connection:
1056       return False
1057
1058     return True
1059
1060   def _ParseHeaders(self):
1061     """Parses the headers sent by the server.
1062
1063     This function also adjusts internal variables based on the header values.
1064
1065     """
1066     req = self.request
1067
1068     # Parse headers
1069     self.header_buffer.seek(0, 0)
1070     req.resp_headers = mimetools.Message(self.header_buffer, 0)
1071
1072     self.server_will_close = self._WillServerCloseConnection()
1073
1074     # Do we have a Content-Length header?
1075     hdr_content_length = req.resp_headers.get(HTTP_CONTENT_LENGTH, None)
1076     if hdr_content_length:
1077       try:
1078         self.content_length = int(hdr_content_length)
1079       except ValueError:
1080         pass
1081       if self.content_length is not None and self.content_length < 0:
1082         self.content_length = None
1083
1084     # does the body have a fixed length? (of zero)
1085     if (req.resp_status in (HTTP_NO_CONTENT, HTTP_NOT_MODIFIED) or
1086         100 <= req.resp_status < 200 or req.method == HTTP_HEAD):
1087       self.content_length = 0
1088
1089     # if the connection remains open and a content-length was not provided,
1090     # then assume that the connection WILL close.
1091     if self.content_length is None:
1092       self.server_will_close = True
1093
1094   def _CheckStatusLineLength(self, length):
1095     if length > self.STATUS_LINE_LENGTH_MAX:
1096       raise _HttpClientError("Status line longer than %d chars" %
1097                              self.STATUS_LINE_LENGTH_MAX)
1098
1099   def _CheckHeaderLength(self, length):
1100     if length > self.HEADER_LENGTH_MAX:
1101       raise _HttpClientError("Headers longer than %d chars" %
1102                              self.HEADER_LENGTH_MAX)
1103
1104   def _ParseBuffer(self, buf, eof):
1105     """Main function for HTTP response state machine.
1106
1107     @type buf: string
1108     @param buf: Receive buffer
1109     @type eof: bool
1110     @param eof: Whether we've reached EOF on the socket
1111     @rtype: string
1112     @return: Updated receive buffer
1113
1114     """
1115     if self.parser_status == self.PS_STATUS_LINE:
1116       # Expect status line
1117       idx = buf.find("\r\n")
1118       if idx >= 0:
1119         self.request.resp_status_line = buf[:idx]
1120
1121         self._CheckStatusLineLength(len(self.request.resp_status_line))
1122
1123         # Remove status line, including CRLF
1124         buf = buf[idx + 2:]
1125
1126         self._ParseStatusLine()
1127
1128         self.parser_status = self.PS_HEADERS
1129       else:
1130         # Check whether incoming data is getting too large, otherwise we just
1131         # fill our read buffer.
1132         self._CheckStatusLineLength(len(buf))
1133
1134     if self.parser_status == self.PS_HEADERS:
1135       # Wait for header end
1136       idx = buf.find("\r\n\r\n")
1137       if idx >= 0:
1138         self.header_buffer.write(buf[:idx + 2])
1139
1140         self._CheckHeaderLength(self.header_buffer.tell())
1141
1142         # Remove headers, including CRLF
1143         buf = buf[idx + 4:]
1144
1145         self._ParseHeaders()
1146
1147         self.parser_status = self.PS_BODY
1148       else:
1149         # Check whether incoming data is getting too large, otherwise we just
1150         # fill our read buffer.
1151         self._CheckHeaderLength(len(buf))
1152
1153     if self.parser_status == self.PS_BODY:
1154       self.body_buffer.write(buf)
1155       buf = ""
1156
1157       # Check whether we've read everything
1158       if (eof or
1159           (self.content_length is not None and
1160            self.body_buffer.tell() >= self.content_length)):
1161         self.parser_status = self.PS_COMPLETE
1162
1163     return buf
1164
1165   def _Connect(self):
1166     """Non-blocking connect to host with timeout.
1167
1168     """
1169     connected = False
1170     while True:
1171       try:
1172         connect_error = self.sock.connect_ex((self.request.host,
1173                                               self.request.port))
1174       except socket.gaierror, err:
1175         raise _HttpClientError("Connection failed: %s" % str(err))
1176
1177       if connect_error == errno.EINTR:
1178         # Mask signals
1179         pass
1180
1181       elif connect_error == 0:
1182         # Connection established
1183         connected = True
1184         break
1185
1186       elif connect_error == errno.EINPROGRESS:
1187         # Connection started
1188         break
1189
1190       raise _HttpClientError("Connection failed (%s: %s)" %
1191                              (connect_error, os.strerror(connect_error)))
1192
1193     if not connected:
1194       # Wait for connection
1195       event = WaitForSocketCondition(self.poller, self.sock,
1196                                      select.POLLOUT, self.CONNECT_TIMEOUT)
1197       if event is None:
1198         raise _HttpClientError("Timeout while connecting to server")
1199
1200       # Get error code
1201       connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
1202       if connect_error != 0:
1203         raise _HttpClientError("Connection failed (%s: %s)" %
1204                                (connect_error, os.strerror(connect_error)))
1205
1206     # Enable TCP keep-alive
1207     self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
1208
1209     # If needed, Linux specific options are available to change the TCP
1210     # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
1211     # TCP_KEEPINTVL.
1212
1213   def _SendRequest(self):
1214     """Sends request to server.
1215
1216     """
1217     buf = self._BuildRequest()
1218
1219     while buf:
1220       # Send only 4 KB at a time
1221       data = buf[:4096]
1222
1223       try:
1224         sent = SocketOperation(self.poller, self.sock, SOCKOP_SEND, data,
1225                                self.WRITE_TIMEOUT)
1226       except _HttpSocketTimeout:
1227         raise _HttpClientError("Timeout while sending request")
1228       except socket.error, err:
1229         raise _HttpClientError("Error sending request: %s" % err)
1230
1231       # Remove sent bytes
1232       buf = buf[sent:]
1233
1234     assert not buf, "Request wasn't sent completely"
1235
1236   def _ReadResponse(self):
1237     """Read response from server.
1238
1239     Calls the parser function after reading a chunk of data.
1240
1241     """
1242     buf = ""
1243     eof = False
1244     while self.parser_status != self.PS_COMPLETE:
1245       try:
1246         data = SocketOperation(self.poller, self.sock, SOCKOP_RECV, 4096,
1247                                self.READ_TIMEOUT)
1248       except _HttpSocketTimeout:
1249         raise _HttpClientError("Timeout while reading response")
1250       except socket.error, err:
1251         raise _HttpClientError("Error while reading response: %s" % err)
1252
1253       if data:
1254         buf += data
1255       else:
1256         eof = True
1257
1258       # Do some parsing and error checking while more data arrives
1259       buf = self._ParseBuffer(buf, eof)
1260
1261       # Must be done only after the buffer has been evaluated
1262       if (eof and
1263           self.parser_status in (self.PS_STATUS_LINE,
1264                                  self.PS_HEADERS)):
1265         raise _HttpClientError("Connection closed prematurely")
1266
1267     # Parse rest
1268     buf = self._ParseBuffer(buf, True)
1269
1270     assert self.parser_status == self.PS_COMPLETE
1271     assert not buf, "Parser didn't read full response"
1272
1273   def _CloseConnection(self, force):
1274     """Closes the connection.
1275
1276     """
1277     if self.server_will_close and not force:
1278       # Wait for server to close
1279       try:
1280         # Check whether it's actually closed
1281         if not SocketOperation(self.poller, self.sock, SOCKOP_RECV, 1,
1282                                self.CLOSE_TIMEOUT):
1283           return
1284       except (socket.error, _HttpClientError, _HttpSocketTimeout):
1285         # Ignore errors at this stage
1286         pass
1287
1288     # Close the connection from our side
1289     try:
1290       SocketOperation(self.poller, self.sock, SOCKOP_SHUTDOWN,
1291                       socket.SHUT_RDWR, self.WRITE_TIMEOUT)
1292     except _HttpSocketTimeout:
1293       raise _HttpClientError("Timeout while shutting down connection")
1294     except socket.error, err:
1295       raise _HttpClientError("Error while shutting down connection: %s" % err)
1296
1297
1298 class _HttpClientPendingRequest(object):
1299   """Data class for pending requests.
1300
1301   """
1302   def __init__(self, request):
1303     self.request = request
1304
1305     # Thread synchronization
1306     self.done = threading.Event()
1307
1308
1309 class HttpClientWorker(workerpool.BaseWorker):
1310   """HTTP client worker class.
1311
1312   """
1313   def RunTask(self, pend_req):
1314     try:
1315       HttpClientRequestExecutor(pend_req.request)
1316     finally:
1317       pend_req.done.set()
1318
1319
1320 class HttpClientWorkerPool(workerpool.WorkerPool):
1321   def __init__(self, manager):
1322     workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
1323                                    HttpClientWorker)
1324     self.manager = manager
1325
1326
1327 class HttpClientManager(object):
1328   """Manages HTTP requests.
1329
1330   """
1331   def __init__(self):
1332     self._wpool = HttpClientWorkerPool(self)
1333
1334   def __del__(self):
1335     self.Shutdown()
1336
1337   def ExecRequests(self, requests):
1338     """Execute HTTP requests.
1339
1340     This function can be called from multiple threads at the same time.
1341
1342     @type requests: List of HttpClientRequest instances
1343     @param requests: The requests to execute
1344     @rtype: List of HttpClientRequest instances
1345     @returns: The list of requests passed in
1346
1347     """
1348     # _HttpClientPendingRequest is used for internal thread synchronization
1349     pending = [_HttpClientPendingRequest(req) for req in requests]
1350
1351     try:
1352       # Add requests to queue
1353       for pend_req in pending:
1354         self._wpool.AddTask(pend_req)
1355
1356     finally:
1357       # In case of an exception we should still wait for the rest, otherwise
1358       # another thread from the worker pool could modify the request object
1359       # after we returned.
1360
1361       # And wait for them to finish
1362       for pend_req in pending:
1363         pend_req.done.wait()
1364
1365     # Return original list
1366     return requests
1367
1368   def Shutdown(self):
1369     self._wpool.Quiesce()
1370     self._wpool.TerminateWorkers()
1371
1372
1373 class _SSLFileObject(object):
1374   """Wrapper around socket._fileobject
1375
1376   This wrapper is required to handle OpenSSL exceptions.
1377
1378   """
1379   def _RequireOpenSocket(fn):
1380     def wrapper(self, *args, **kwargs):
1381       if self.closed:
1382         raise SocketClosed("Socket is closed")
1383       return fn(self, *args, **kwargs)
1384     return wrapper
1385
1386   def __init__(self, sock, mode='rb', bufsize=-1):
1387     self._base = socket._fileobject(sock, mode=mode, bufsize=bufsize)
1388
1389   def _ConnectionLost(self):
1390     self._base = None
1391
1392   def _getclosed(self):
1393     return self._base is None or self._base.closed
1394   closed = property(_getclosed, doc="True if the file is closed")
1395
1396   @_RequireOpenSocket
1397   def close(self):
1398     return self._base.close()
1399
1400   @_RequireOpenSocket
1401   def flush(self):
1402     return self._base.flush()
1403
1404   @_RequireOpenSocket
1405   def fileno(self):
1406     return self._base.fileno()
1407
1408   @_RequireOpenSocket
1409   def read(self, size=-1):
1410     return self._ReadWrapper(self._base.read, size=size)
1411
1412   @_RequireOpenSocket
1413   def readline(self, size=-1):
1414     return self._ReadWrapper(self._base.readline, size=size)
1415
1416   def _ReadWrapper(self, fn, *args, **kwargs):
1417     while True:
1418       try:
1419         return fn(*args, **kwargs)
1420
1421       except OpenSSL.SSL.ZeroReturnError, err:
1422         self._ConnectionLost()
1423         return ""
1424
1425       except OpenSSL.SSL.WantReadError:
1426         continue
1427
1428       #except OpenSSL.SSL.WantWriteError:
1429       # TODO
1430
1431       except OpenSSL.SSL.SysCallError, (retval, desc):
1432         if ((retval == -1 and desc == _SSL_UNEXPECTED_EOF)
1433             or retval > 0):
1434           self._ConnectionLost()
1435           return ""
1436
1437         logging.exception("Error in OpenSSL")
1438         self._ConnectionLost()
1439         raise socket.error(err.args)
1440
1441       except OpenSSL.SSL.Error, err:
1442         self._ConnectionLost()
1443         raise socket.error(err.args)
1444
1445   @_RequireOpenSocket
1446   def write(self, data):
1447     return self._WriteWrapper(self._base.write, data)
1448
1449   def _WriteWrapper(self, fn, *args, **kwargs):
1450     while True:
1451       try:
1452         return fn(*args, **kwargs)
1453       except OpenSSL.SSL.ZeroReturnError, err:
1454         self._ConnectionLost()
1455         return 0
1456
1457       except OpenSSL.SSL.WantWriteError:
1458         continue
1459
1460       #except OpenSSL.SSL.WantReadError:
1461       # TODO
1462
1463       except OpenSSL.SSL.SysCallError, err:
1464         if err.args[0] == -1 and data == "":
1465           # errors when writing empty strings are expected
1466           # and can be ignored
1467           return 0
1468
1469         self._ConnectionLost()
1470         raise socket.error(err.args)
1471
1472       except OpenSSL.SSL.Error, err:
1473         self._ConnectionLost()
1474         raise socket.error(err.args)