ganeti.http: Split HTTP server and client into separate files
[ganeti-local] / lib / http / client.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP client module.
22
23 """
24
25 import BaseHTTPServer
26 import cgi
27 import logging
28 import OpenSSL
29 import os
30 import select
31 import socket
32 import sys
33 import time
34 import signal
35 import errno
36 import threading
37
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import utils
42 from ganeti import http
43
44
45 HTTP_CLIENT_THREADS = 10
46
47
48 class HttpClientRequest(object):
49   def __init__(self, host, port, method, path, headers=None, post_data=None,
50                ssl_params=None, ssl_verify_peer=False):
51     """Describes an HTTP request.
52
53     @type host: string
54     @param host: Hostname
55     @type port: int
56     @param port: Port
57     @type method: string
58     @param method: Method name
59     @type path: string
60     @param path: Request path
61     @type headers: dict or None
62     @param headers: Additional headers to send
63     @type post_data: string or None
64     @param post_data: Additional data to send
65     @type ssl_params: HttpSslParams
66     @param ssl_params: SSL key and certificate
67     @type ssl_verify_peer: bool
68     @param ssl_verify_peer: Whether to compare our certificate with server's
69                             certificate
70
71     """
72     if post_data is not None:
73       assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74         "Only POST and GET requests support sending data"
75
76     assert path.startswith("/"), "Path must start with slash (/)"
77
78     # Request attributes
79     self.host = host
80     self.port = port
81     self.ssl_params = ssl_params
82     self.ssl_verify_peer = ssl_verify_peer
83     self.method = method
84     self.path = path
85     self.headers = headers
86     self.post_data = post_data
87
88     self.success = None
89     self.error = None
90
91     # Raw response
92     self.response = None
93
94     # Response attributes
95     self.resp_version = None
96     self.resp_status_code = None
97     self.resp_reason = None
98     self.resp_headers = None
99     self.resp_body = None
100
101
102 class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103   pass
104
105
106 class _HttpServerToClientMessageReader(http.HttpMessageReader):
107   # Length limits
108   START_LINE_LENGTH_MAX = 512
109   HEADER_LENGTH_MAX = 4096
110
111   def ParseStartLine(self, start_line):
112     """Parses the status line sent by the server.
113
114     """
115     # Empty lines are skipped when reading
116     assert start_line
117
118     try:
119       [version, status, reason] = start_line.split(None, 2)
120     except ValueError:
121       try:
122         [version, status] = start_line.split(None, 1)
123         reason = ""
124       except ValueError:
125         version = http.HTTP_0_9
126
127     if version:
128       version = version.upper()
129
130     # The status code is a three-digit number
131     try:
132       status = int(status)
133       if status < 100 or status > 999:
134         status = -1
135     except ValueError:
136       status = -1
137
138     if status == -1:
139       raise http.HttpError("Invalid status code (%r)" % start_line)
140
141     return http.HttpServerToClientStartLine(version, status, reason)
142
143
144 class HttpClientRequestExecutor(http.HttpSocketBase):
145   # Default headers
146   DEFAULT_HEADERS = {
147     http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148     # TODO: For keep-alive, don't send "Connection: close"
149     http.HTTP_CONNECTION: "close",
150     }
151
152   # Timeouts in seconds for socket layer
153   # TODO: Soft timeout instead of only socket timeout?
154   # TODO: Make read timeout configurable per OpCode?
155   CONNECT_TIMEOUT = 5
156   WRITE_TIMEOUT = 10
157   READ_TIMEOUT = None
158   CLOSE_TIMEOUT = 1
159
160   def __init__(self, req):
161     """Initializes the HttpClientRequestExecutor class.
162
163     @type req: HttpClientRequest
164     @param req: Request object
165
166     """
167     http.HttpSocketBase.__init__(self)
168     self.request = req
169
170     self.poller = select.poll()
171
172     try:
173       # TODO: Implement connection caching/keep-alive
174       self.sock = self._CreateSocket(req.ssl_params,
175                                      req.ssl_verify_peer)
176
177       # Disable Python's timeout
178       self.sock.settimeout(None)
179
180       # Operate in non-blocking mode
181       self.sock.setblocking(0)
182
183       response_msg_reader = None
184       response_msg = None
185       force_close = True
186
187       self._Connect()
188       try:
189         self._SendRequest()
190         (response_msg_reader, response_msg) = self._ReadResponse()
191
192         # Only wait for server to close if we didn't have any exception.
193         force_close = False
194       finally:
195         # TODO: Keep-alive is not supported, always close connection
196         force_close = True
197         http.ShutdownConnection(self.poller, self.sock,
198                                 self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
199                                 response_msg_reader, force_close)
200
201       self.sock.close()
202       self.sock = None
203
204       req.response = response_msg
205
206       req.resp_version = req.response.start_line.version
207       req.resp_status_code = req.response.start_line.code
208       req.resp_reason = req.response.start_line.reason
209       req.resp_headers = req.response.headers
210       req.resp_body = req.response.body
211
212       req.success = True
213       req.error = None
214
215     except http.HttpError, err:
216       req.success = False
217       req.error = str(err)
218
219   def _Connect(self):
220     """Non-blocking connect to host with timeout.
221
222     """
223     connected = False
224     while True:
225       try:
226         connect_error = self.sock.connect_ex((self.request.host,
227                                               self.request.port))
228       except socket.gaierror, err:
229         raise http.HttpError("Connection failed: %s" % str(err))
230
231       if connect_error == errno.EINTR:
232         # Mask signals
233         pass
234
235       elif connect_error == 0:
236         # Connection established
237         connected = True
238         break
239
240       elif connect_error == errno.EINPROGRESS:
241         # Connection started
242         break
243
244       raise http.HttpError("Connection failed (%s: %s)" %
245                              (connect_error, os.strerror(connect_error)))
246
247     if not connected:
248       # Wait for connection
249       event = http.WaitForSocketCondition(self.poller, self.sock,
250                                           select.POLLOUT, self.CONNECT_TIMEOUT)
251       if event is None:
252         raise http.HttpError("Timeout while connecting to server")
253
254       # Get error code
255       connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
256       if connect_error != 0:
257         raise http.HttpError("Connection failed (%s: %s)" %
258                                (connect_error, os.strerror(connect_error)))
259
260     # Enable TCP keep-alive
261     self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
262
263     # If needed, Linux specific options are available to change the TCP
264     # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
265     # TCP_KEEPINTVL.
266
267   def _SendRequest(self):
268     """Sends request to server.
269
270     """
271     # Headers
272     send_headers = self.DEFAULT_HEADERS.copy()
273
274     if self.request.headers:
275       send_headers.update(self.request.headers)
276
277     send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
278
279     # Response message
280     msg = http.HttpMessage()
281
282     # Combine request line. We only support HTTP/1.0 (no chunked transfers and
283     # no keep-alive).
284     # TODO: For keep-alive, change to HTTP/1.1
285     msg.start_line = \
286       http.HttpClientToServerStartLine(method=self.request.method.upper(),
287                                        path=self.request.path, version=http.HTTP_1_0)
288     msg.headers = send_headers
289     msg.body = self.request.post_data
290
291     try:
292       _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
293     except http.HttpSocketTimeout:
294       raise http.HttpError("Timeout while sending request")
295     except socket.error, err:
296       raise http.HttpError("Error sending request: %s" % err)
297
298   def _ReadResponse(self):
299     """Read response from server.
300
301     """
302     response_msg = http.HttpMessage()
303
304     try:
305       response_msg_reader = \
306         _HttpServerToClientMessageReader(self.sock, response_msg,
307                                          self.READ_TIMEOUT)
308     except http.HttpSocketTimeout:
309       raise http.HttpError("Timeout while reading response")
310     except socket.error, err:
311       raise http.HttpError("Error reading response: %s" % err)
312
313     return (response_msg_reader, response_msg)
314
315
316 class _HttpClientPendingRequest(object):
317   """Data class for pending requests.
318
319   """
320   def __init__(self, request):
321     self.request = request
322
323     # Thread synchronization
324     self.done = threading.Event()
325
326
327 class HttpClientWorker(workerpool.BaseWorker):
328   """HTTP client worker class.
329
330   """
331   def RunTask(self, pend_req):
332     try:
333       HttpClientRequestExecutor(pend_req.request)
334     finally:
335       pend_req.done.set()
336
337
338 class HttpClientWorkerPool(workerpool.WorkerPool):
339   def __init__(self, manager):
340     workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
341                                    HttpClientWorker)
342     self.manager = manager
343
344
345 class HttpClientManager(object):
346   """Manages HTTP requests.
347
348   """
349   def __init__(self):
350     self._wpool = HttpClientWorkerPool(self)
351
352   def __del__(self):
353     self.Shutdown()
354
355   def ExecRequests(self, requests):
356     """Execute HTTP requests.
357
358     This function can be called from multiple threads at the same time.
359
360     @type requests: List of HttpClientRequest instances
361     @param requests: The requests to execute
362     @rtype: List of HttpClientRequest instances
363     @returns: The list of requests passed in
364
365     """
366     # _HttpClientPendingRequest is used for internal thread synchronization
367     pending = [_HttpClientPendingRequest(req) for req in requests]
368
369     try:
370       # Add requests to queue
371       for pend_req in pending:
372         self._wpool.AddTask(pend_req)
373
374     finally:
375       # In case of an exception we should still wait for the rest, otherwise
376       # another thread from the worker pool could modify the request object
377       # after we returned.
378
379       # And wait for them to finish
380       for pend_req in pending:
381         pend_req.done.wait()
382
383     # Return original list
384     return requests
385
386   def Shutdown(self):
387     self._wpool.Quiesce()
388     self._wpool.TerminateWorkers()