Remove old HTTP code
[ganeti-local] / lib / http / __init__.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP module.
22
23 """
24
25 import logging
26 import mimetools
27 import OpenSSL
28 import select
29 import socket
30 import errno
31
32 from cStringIO import StringIO
33
34 from ganeti import constants
35 from ganeti import serializer
36 from ganeti import utils
37
38
39 HTTP_CLIENT_THREADS = 10
40
41 HTTP_GANETI_VERSION = "Ganeti %s" % constants.RELEASE_VERSION
42
43 HTTP_OK = 200
44 HTTP_NO_CONTENT = 204
45 HTTP_NOT_MODIFIED = 304
46
47 HTTP_0_9 = "HTTP/0.9"
48 HTTP_1_0 = "HTTP/1.0"
49 HTTP_1_1 = "HTTP/1.1"
50
51 HTTP_GET = "GET"
52 HTTP_HEAD = "HEAD"
53 HTTP_POST = "POST"
54 HTTP_PUT = "PUT"
55
56 HTTP_ETAG = "ETag"
57 HTTP_HOST = "Host"
58 HTTP_SERVER = "Server"
59 HTTP_DATE = "Date"
60 HTTP_USER_AGENT = "User-Agent"
61 HTTP_CONTENT_TYPE = "Content-Type"
62 HTTP_CONTENT_LENGTH = "Content-Length"
63 HTTP_CONNECTION = "Connection"
64 HTTP_KEEP_ALIVE = "Keep-Alive"
65
66 _SSL_UNEXPECTED_EOF = "Unexpected EOF"
67
68 # Socket operations
69 (SOCKOP_SEND,
70  SOCKOP_RECV,
71  SOCKOP_SHUTDOWN) = range(3)
72
73
74 class HttpError(Exception):
75   """Internal exception for HTTP errors.
76
77   This should only be used for internal error reporting.
78
79   """
80
81
82 class _HttpClientError(Exception):
83   """Internal exception for HTTP client errors.
84
85   This should only be used for internal error reporting.
86
87   """
88
89
90 class HttpSocketTimeout(Exception):
91   """Internal exception for socket timeouts.
92
93   This should only be used for internal error reporting.
94
95   """
96
97
98 class HttpException(Exception):
99   code = None
100   message = None
101
102   def __init__(self, message=None):
103     Exception.__init__(self)
104     if message is not None:
105       self.message = message
106
107
108 class HttpBadRequest(HttpException):
109   code = 400
110
111
112 class HttpForbidden(HttpException):
113   code = 403
114
115
116 class HttpNotFound(HttpException):
117   code = 404
118
119
120 class HttpGone(HttpException):
121   code = 410
122
123
124 class HttpLengthRequired(HttpException):
125   code = 411
126
127
128 class HttpInternalError(HttpException):
129   code = 500
130
131
132 class HttpNotImplemented(HttpException):
133   code = 501
134
135
136 class HttpServiceUnavailable(HttpException):
137   code = 503
138
139
140 class HttpVersionNotSupported(HttpException):
141   code = 505
142
143
144 class HttpJsonConverter:
145   CONTENT_TYPE = "application/json"
146
147   def Encode(self, data):
148     return serializer.DumpJson(data)
149
150   def Decode(self, data):
151     return serializer.LoadJson(data)
152
153
154 def WaitForSocketCondition(poller, sock, event, timeout):
155   """Waits for a condition to occur on the socket.
156
157   @type poller: select.Poller
158   @param poller: Poller object as created by select.poll()
159   @type sock: socket
160   @param socket: Wait for events on this socket
161   @type event: int
162   @param event: ORed condition (see select module)
163   @type timeout: float or None
164   @param timeout: Timeout in seconds
165   @rtype: int or None
166   @return: None for timeout, otherwise occured conditions
167
168   """
169   check = (event | select.POLLPRI |
170            select.POLLNVAL | select.POLLHUP | select.POLLERR)
171
172   if timeout is not None:
173     # Poller object expects milliseconds
174     timeout *= 1000
175
176   poller.register(sock, event)
177   try:
178     while True:
179       # TODO: If the main thread receives a signal and we have no timeout, we
180       # could wait forever. This should check a global "quit" flag or
181       # something every so often.
182       io_events = poller.poll(timeout)
183       if not io_events:
184         # Timeout
185         return None
186       for (evfd, evcond) in io_events:
187         if evcond & check:
188           return evcond
189   finally:
190     poller.unregister(sock)
191
192
193 def SocketOperation(poller, sock, op, arg1, timeout):
194   """Wrapper around socket functions.
195
196   This function abstracts error handling for socket operations, especially
197   for the complicated interaction with OpenSSL.
198
199   @type poller: select.Poller
200   @param poller: Poller object as created by select.poll()
201   @type sock: socket
202   @param socket: Socket for the operation
203   @type op: int
204   @param op: Operation to execute (SOCKOP_* constants)
205   @type arg1: any
206   @param arg1: Parameter for function (if needed)
207   @type timeout: None or float
208   @param timeout: Timeout in seconds or None
209
210   """
211   # TODO: event_poll/event_check/override
212   if op == SOCKOP_SEND:
213     event_poll = select.POLLOUT
214     event_check = select.POLLOUT
215
216   elif op == SOCKOP_RECV:
217     event_poll = select.POLLIN
218     event_check = select.POLLIN | select.POLLPRI
219
220   elif op == SOCKOP_SHUTDOWN:
221     event_poll = None
222     event_check = None
223
224     # The timeout is only used when OpenSSL requests polling for a condition.
225     # It is not advisable to have no timeout for shutdown.
226     assert timeout
227
228   else:
229     raise AssertionError("Invalid socket operation")
230
231   # No override by default
232   event_override = 0
233
234   while True:
235     # Poll only for certain operations and when asked for by an override
236     if event_override or op in (SOCKOP_SEND, SOCKOP_RECV):
237       if event_override:
238         wait_for_event = event_override
239       else:
240         wait_for_event = event_poll
241
242       event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
243       if event is None:
244         raise HttpSocketTimeout()
245
246       if (op == SOCKOP_RECV and
247           event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
248         return ""
249
250       if not event & wait_for_event:
251         continue
252
253     # Reset override
254     event_override = 0
255
256     try:
257       try:
258         if op == SOCKOP_SEND:
259           return sock.send(arg1)
260
261         elif op == SOCKOP_RECV:
262           return sock.recv(arg1)
263
264         elif op == SOCKOP_SHUTDOWN:
265           if isinstance(sock, OpenSSL.SSL.ConnectionType):
266             # PyOpenSSL's shutdown() doesn't take arguments
267             return sock.shutdown()
268           else:
269             return sock.shutdown(arg1)
270
271       except OpenSSL.SSL.WantWriteError:
272         # OpenSSL wants to write, poll for POLLOUT
273         event_override = select.POLLOUT
274         continue
275
276       except OpenSSL.SSL.WantReadError:
277         # OpenSSL wants to read, poll for POLLIN
278         event_override = select.POLLIN | select.POLLPRI
279         continue
280
281       except OpenSSL.SSL.WantX509LookupError:
282         continue
283
284       except OpenSSL.SSL.SysCallError, err:
285         if op == SOCKOP_SEND:
286           # arg1 is the data when writing
287           if err.args and err.args[0] == -1 and arg1 == "":
288             # errors when writing empty strings are expected
289             # and can be ignored
290             return 0
291
292         elif op == SOCKOP_RECV:
293           if err.args == (-1, _SSL_UNEXPECTED_EOF):
294             return ""
295
296         raise socket.error(err.args)
297
298       except OpenSSL.SSL.Error, err:
299         raise socket.error(err.args)
300
301     except socket.error, err:
302       if err.args and err.args[0] == errno.EAGAIN:
303         # Ignore EAGAIN
304         continue
305
306       raise
307
308
309 def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
310                        force):
311   """Closes the connection.
312
313   """
314   poller = select.poll()
315
316   #print msgreader.peer_will_close, force
317   if msgreader and msgreader.peer_will_close and not force:
318     # Wait for peer to close
319     try:
320       # Check whether it's actually closed
321       if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
322         return
323     except (socket.error, HttpError, HttpSocketTimeout):
324       # Ignore errors at this stage
325       pass
326
327   # Close the connection from our side
328   try:
329     SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
330                     write_timeout)
331   except HttpSocketTimeout:
332     raise HttpError("Timeout while shutting down connection")
333   except socket.error, err:
334     raise HttpError("Error while shutting down connection: %s" % err)
335
336
337 class HttpSslParams(object):
338   """Data class for SSL key and certificate.
339
340   """
341   def __init__(self, ssl_key_path, ssl_cert_path):
342     """Initializes this class.
343
344     @type ssl_key_path: string
345     @param ssl_key_path: Path to file containing SSL key in PEM format
346     @type ssl_cert_path: string
347     @param ssl_cert_path: Path to file containing SSL certificate in PEM format
348
349     """
350     self.ssl_key_pem = utils.ReadFile(ssl_key_path)
351     self.ssl_cert_pem = utils.ReadFile(ssl_cert_path)
352
353   def GetKey(self):
354     return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
355                                           self.ssl_key_pem)
356
357   def GetCertificate(self):
358     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
359                                            self.ssl_cert_pem)
360
361
362 class HttpSocketBase(object):
363   """Base class for HTTP server and client.
364
365   """
366   def __init__(self):
367     self._using_ssl = None
368     self._ssl_params = None
369     self._ssl_key = None
370     self._ssl_cert = None
371
372   def _CreateSocket(self, ssl_params, ssl_verify_peer):
373     """Creates a TCP socket and initializes SSL if needed.
374
375     @type ssl_params: HttpSslParams
376     @param ssl_params: SSL key and certificate
377     @type ssl_verify_peer: bool
378     @param ssl_verify_peer: Whether to require client certificate and compare
379                             it with our certificate
380
381     """
382     self._ssl_params = ssl_params
383
384     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
385
386     # Should we enable SSL?
387     self._using_ssl = ssl_params is not None
388
389     if not self._using_ssl:
390       return sock
391
392     self._ssl_key = ssl_params.GetKey()
393     self._ssl_cert = ssl_params.GetCertificate()
394
395     ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
396     ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
397
398     ctx.use_privatekey(self._ssl_key)
399     ctx.use_certificate(self._ssl_cert)
400     ctx.check_privatekey()
401
402     if ssl_verify_peer:
403       ctx.set_verify(OpenSSL.SSL.VERIFY_PEER |
404                      OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
405                      self._SSLVerifyCallback)
406
407     return OpenSSL.SSL.Connection(ctx, sock)
408
409   def _SSLVerifyCallback(self, conn, cert, errnum, errdepth, ok):
410     """Verify the certificate provided by the peer
411
412     We only compare fingerprints. The client must use the same certificate as
413     we do on our side.
414
415     """
416     assert self._ssl_params, "SSL not initialized"
417
418     return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
419             self._ssl_cert.digest("md5") == cert.digest("md5"))
420
421
422 class HttpMessage(object):
423   """Data structure for HTTP message.
424
425   """
426   def __init__(self):
427     self.start_line = None
428     self.headers = None
429     self.body = None
430     self.decoded_body = None
431
432
433 class HttpClientToServerStartLine(object):
434   """Data structure for HTTP request start line.
435
436   """
437   def __init__(self, method, path, version):
438     self.method = method
439     self.path = path
440     self.version = version
441
442   def __str__(self):
443     return "%s %s %s" % (self.method, self.path, self.version)
444
445
446 class HttpServerToClientStartLine(object):
447   """Data structure for HTTP response start line.
448
449   """
450   def __init__(self, version, code, reason):
451     self.version = version
452     self.code = code
453     self.reason = reason
454
455   def __str__(self):
456     return "%s %s %s" % (self.version, self.code, self.reason)
457
458
459 class HttpMessageWriter(object):
460   """Writes an HTTP message to a socket.
461
462   """
463   def __init__(self, sock, msg, write_timeout):
464     self._msg = msg
465
466     self._PrepareMessage()
467
468     buf = self._FormatMessage()
469
470     poller = select.poll()
471     while buf:
472       # Send only 4 KB at a time
473       data = buf[:4096]
474
475       sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
476                              write_timeout)
477
478       # Remove sent bytes
479       buf = buf[sent:]
480
481     assert not buf, "Message wasn't sent completely"
482
483   def _PrepareMessage(self):
484     """Prepares the HTTP message by setting mandatory headers.
485
486     """
487     # RFC2616, section 4.3: "The presence of a message-body in a request is
488     # signaled by the inclusion of a Content-Length or Transfer-Encoding header
489     # field in the request's message-headers."
490     if self._msg.body:
491       self._msg.headers[HTTP_CONTENT_LENGTH] = len(self._msg.body)
492
493   def _FormatMessage(self):
494     """Serializes the HTTP message into a string.
495
496     """
497     buf = StringIO()
498
499     # Add start line
500     buf.write(str(self._msg.start_line))
501     buf.write("\r\n")
502
503     # Add headers
504     if self._msg.start_line.version != HTTP_0_9:
505       for name, value in self._msg.headers.iteritems():
506         buf.write("%s: %s\r\n" % (name, value))
507
508     buf.write("\r\n")
509
510     # Add message body if needed
511     if self.HasMessageBody():
512       buf.write(self._msg.body)
513
514     elif self._msg.body:
515       logging.warning("Ignoring message body")
516
517     return buf.getvalue()
518
519   def HasMessageBody(self):
520     """Checks whether the HTTP message contains a body.
521
522     Can be overriden by subclasses.
523
524     """
525     return bool(self._msg.body)
526
527
528 class HttpMessageReader(object):
529   """Reads HTTP message from socket.
530
531   """
532   # Length limits
533   START_LINE_LENGTH_MAX = None
534   HEADER_LENGTH_MAX = None
535
536   # Parser state machine
537   PS_START_LINE = "start-line"
538   PS_HEADERS = "headers"
539   PS_BODY = "entity-body"
540   PS_COMPLETE = "complete"
541
542   def __init__(self, sock, msg, read_timeout):
543     self.sock = sock
544     self.msg = msg
545
546     self.poller = select.poll()
547     self.start_line_buffer = None
548     self.header_buffer = StringIO()
549     self.body_buffer = StringIO()
550     self.parser_status = self.PS_START_LINE
551     self.content_length = None
552     self.peer_will_close = None
553
554     buf = ""
555     eof = False
556     while self.parser_status != self.PS_COMPLETE:
557       data = SocketOperation(self.poller, sock, SOCKOP_RECV, 4096,
558                              read_timeout)
559
560       if data:
561         buf += data
562       else:
563         eof = True
564
565       # Do some parsing and error checking while more data arrives
566       buf = self._ContinueParsing(buf, eof)
567
568       # Must be done only after the buffer has been evaluated
569       # TODO: Connection-length < len(data read) and connection closed
570       if (eof and
571           self.parser_status in (self.PS_START_LINE,
572                                  self.PS_HEADERS)):
573         raise HttpError("Connection closed prematurely")
574
575     # Parse rest
576     buf = self._ContinueParsing(buf, True)
577
578     assert self.parser_status == self.PS_COMPLETE
579     assert not buf, "Parser didn't read full response"
580
581     msg.body = self.body_buffer.getvalue()
582
583     # TODO: Content-type, error handling
584     if msg.body:
585       msg.decoded_body = HttpJsonConverter().Decode(msg.body)
586     else:
587       msg.decoded_body = None
588
589     if msg.decoded_body:
590       logging.debug("Message body: %s", msg.decoded_body)
591
592   def _ContinueParsing(self, buf, eof):
593     """Main function for HTTP message state machine.
594
595     @type buf: string
596     @param buf: Receive buffer
597     @type eof: bool
598     @param eof: Whether we've reached EOF on the socket
599     @rtype: string
600     @return: Updated receive buffer
601
602     """
603     if self.parser_status == self.PS_START_LINE:
604       # Expect start line
605       while True:
606         idx = buf.find("\r\n")
607
608         # RFC2616, section 4.1: "In the interest of robustness, servers SHOULD
609         # ignore any empty line(s) received where a Request-Line is expected.
610         # In other words, if the server is reading the protocol stream at the
611         # beginning of a message and receives a CRLF first, it should ignore
612         # the CRLF."
613         if idx == 0:
614           # TODO: Limit number of CRLFs for safety?
615           buf = buf[:2]
616           continue
617
618         if idx > 0:
619           self.start_line_buffer = buf[:idx]
620
621           self._CheckStartLineLength(len(self.start_line_buffer))
622
623           # Remove status line, including CRLF
624           buf = buf[idx + 2:]
625
626           self.msg.start_line = self.ParseStartLine(self.start_line_buffer)
627
628           self.parser_status = self.PS_HEADERS
629         else:
630           # Check whether incoming data is getting too large, otherwise we just
631           # fill our read buffer.
632           self._CheckStartLineLength(len(buf))
633
634         break
635
636     # TODO: Handle messages without headers
637     if self.parser_status == self.PS_HEADERS:
638       # Wait for header end
639       idx = buf.find("\r\n\r\n")
640       if idx >= 0:
641         self.header_buffer.write(buf[:idx + 2])
642
643         self._CheckHeaderLength(self.header_buffer.tell())
644
645         # Remove headers, including CRLF
646         buf = buf[idx + 4:]
647
648         self._ParseHeaders()
649
650         self.parser_status = self.PS_BODY
651       else:
652         # Check whether incoming data is getting too large, otherwise we just
653         # fill our read buffer.
654         self._CheckHeaderLength(len(buf))
655
656     if self.parser_status == self.PS_BODY:
657       # TODO: Implement max size for body_buffer
658       self.body_buffer.write(buf)
659       buf = ""
660
661       # Check whether we've read everything
662       #
663       # RFC2616, section 4.4: "When a message-body is included with a message,
664       # the transfer-length of that body is determined by one of the following
665       # [...] 5. By the server closing the connection. (Closing the connection
666       # cannot be used to indicate the end of a request body, since that would
667       # leave no possibility for the server to send back a response.)"
668       if (eof or
669           self.content_length is None or
670           (self.content_length is not None and
671            self.body_buffer.tell() >= self.content_length)):
672         self.parser_status = self.PS_COMPLETE
673
674     return buf
675
676   def _CheckStartLineLength(self, length):
677     """Limits the start line buffer size.
678
679     @type length: int
680     @param length: Buffer size
681
682     """
683     if (self.START_LINE_LENGTH_MAX is not None and
684         length > self.START_LINE_LENGTH_MAX):
685       raise HttpError("Start line longer than %d chars" %
686                        self.START_LINE_LENGTH_MAX)
687
688   def _CheckHeaderLength(self, length):
689     """Limits the header buffer size.
690
691     @type length: int
692     @param length: Buffer size
693
694     """
695     if (self.HEADER_LENGTH_MAX is not None and
696         length > self.HEADER_LENGTH_MAX):
697       raise HttpError("Headers longer than %d chars" % self.HEADER_LENGTH_MAX)
698
699   def ParseStartLine(self, start_line):
700     """Parses the start line of a message.
701
702     Must be overriden by subclass.
703
704     @type start_line: string
705     @param start_line: Start line string
706
707     """
708     raise NotImplementedError()
709
710   def _WillPeerCloseConnection(self):
711     """Evaluate whether peer will close the connection.
712
713     @rtype: bool
714     @return: Whether peer will close the connection
715
716     """
717     # RFC2616, section 14.10: "HTTP/1.1 defines the "close" connection option
718     # for the sender to signal that the connection will be closed after
719     # completion of the response. For example,
720     #
721     #        Connection: close
722     #
723     # in either the request or the response header fields indicates that the
724     # connection SHOULD NOT be considered `persistent' (section 8.1) after the
725     # current request/response is complete."
726
727     hdr_connection = self.msg.headers.get(HTTP_CONNECTION, None)
728     if hdr_connection:
729       hdr_connection = hdr_connection.lower()
730
731     # An HTTP/1.1 server is assumed to stay open unless explicitly closed.
732     if self.msg.start_line.version == HTTP_1_1:
733       return (hdr_connection and "close" in hdr_connection)
734
735     # Some HTTP/1.0 implementations have support for persistent connections,
736     # using rules different than HTTP/1.1.
737
738     # For older HTTP, Keep-Alive indicates persistent connection.
739     if self.msg.headers.get(HTTP_KEEP_ALIVE):
740       return False
741
742     # At least Akamai returns a "Connection: Keep-Alive" header, which was
743     # supposed to be sent by the client.
744     if hdr_connection and "keep-alive" in hdr_connection:
745       return False
746
747     return True
748
749   def _ParseHeaders(self):
750     """Parses the headers.
751
752     This function also adjusts internal variables based on header values.
753
754     RFC2616, section 4.3: "The presence of a message-body in a request is
755     signaled by the inclusion of a Content-Length or Transfer-Encoding header
756     field in the request's message-headers."
757
758     """
759     # Parse headers
760     self.header_buffer.seek(0, 0)
761     self.msg.headers = mimetools.Message(self.header_buffer, 0)
762
763     self.peer_will_close = self._WillPeerCloseConnection()
764
765     # Do we have a Content-Length header?
766     hdr_content_length = self.msg.headers.get(HTTP_CONTENT_LENGTH, None)
767     if hdr_content_length:
768       try:
769         self.content_length = int(hdr_content_length)
770       except ValueError:
771         self.content_length = None
772       if self.content_length is not None and self.content_length < 0:
773         self.content_length = None
774
775     # if the connection remains open and a content-length was not provided,
776     # then assume that the connection WILL close.
777     if self.content_length is None:
778       self.peer_will_close = True