4 # Copyright (C) 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import utils
42 from ganeti import http
45 HTTP_CLIENT_THREADS = 10
48 class HttpClientRequest(object):
49 def __init__(self, host, port, method, path, headers=None, post_data=None,
50 ssl_params=None, ssl_verify_peer=False):
51 """Describes an HTTP request.
58 @param method: Method name
60 @param path: Request path
61 @type headers: dict or None
62 @param headers: Additional headers to send
63 @type post_data: string or None
64 @param post_data: Additional data to send
65 @type ssl_params: HttpSslParams
66 @param ssl_params: SSL key and certificate
67 @type ssl_verify_peer: bool
68 @param ssl_verify_peer: Whether to compare our certificate with
72 if post_data is not None:
73 assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74 "Only POST and GET requests support sending data"
76 assert path.startswith("/"), "Path must start with slash (/)"
81 self.ssl_params = ssl_params
82 self.ssl_verify_peer = ssl_verify_peer
85 self.headers = headers
86 self.post_data = post_data
95 self.resp_version = None
96 self.resp_status_code = None
97 self.resp_reason = None
98 self.resp_headers = None
102 class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
106 class _HttpServerToClientMessageReader(http.HttpMessageReader):
108 START_LINE_LENGTH_MAX = 512
109 HEADER_LENGTH_MAX = 4096
111 def ParseStartLine(self, start_line):
112 """Parses the status line sent by the server.
115 # Empty lines are skipped when reading
119 [version, status, reason] = start_line.split(None, 2)
122 [version, status] = start_line.split(None, 1)
125 version = http.HTTP_0_9
128 version = version.upper()
130 # The status code is a three-digit number
133 if status < 100 or status > 999:
139 raise http.HttpError("Invalid status code (%r)" % start_line)
141 return http.HttpServerToClientStartLine(version, status, reason)
144 class HttpClientRequestExecutor(http.HttpBase):
147 http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148 # TODO: For keep-alive, don't send "Connection: close"
149 http.HTTP_CONNECTION: "close",
152 # Timeouts in seconds for socket layer
153 # TODO: Soft timeout instead of only socket timeout?
154 # TODO: Make read timeout configurable per OpCode?
160 def __init__(self, req):
161 """Initializes the HttpClientRequestExecutor class.
163 @type req: HttpClientRequest
164 @param req: Request object
167 http.HttpBase.__init__(self)
171 # TODO: Implement connection caching/keep-alive
172 self.sock = self._CreateSocket(req.ssl_params,
175 # Disable Python's timeout
176 self.sock.settimeout(None)
178 # Operate in non-blocking mode
179 self.sock.setblocking(0)
181 response_msg_reader = None
188 (response_msg_reader, response_msg) = self._ReadResponse()
190 # Only wait for server to close if we didn't have any exception.
193 # TODO: Keep-alive is not supported, always close connection
195 http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
196 self.WRITE_TIMEOUT, response_msg_reader,
202 req.response = response_msg
204 req.resp_version = req.response.start_line.version
205 req.resp_status_code = req.response.start_line.code
206 req.resp_reason = req.response.start_line.reason
207 req.resp_headers = req.response.headers
208 req.resp_body = req.response.body
213 except http.HttpError, err:
218 """Non-blocking connect to host with timeout.
224 connect_error = self.sock.connect_ex((self.request.host,
226 except socket.gaierror, err:
227 raise http.HttpError("Connection failed: %s" % str(err))
229 if connect_error == errno.EINTR:
233 elif connect_error == 0:
234 # Connection established
238 elif connect_error == errno.EINPROGRESS:
242 raise http.HttpError("Connection failed (%s: %s)" %
243 (connect_error, os.strerror(connect_error)))
246 # Wait for connection
247 event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
248 self.CONNECT_TIMEOUT)
250 raise http.HttpError("Timeout while connecting to server")
253 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
254 if connect_error != 0:
255 raise http.HttpError("Connection failed (%s: %s)" %
256 (connect_error, os.strerror(connect_error)))
258 # Enable TCP keep-alive
259 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
261 # If needed, Linux specific options are available to change the TCP
262 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
265 # Do the secret SSL handshake
267 self.sock.set_connect_state()
269 http.Handshake(self.sock, self.WRITE_TIMEOUT)
270 except http.HttpSessionHandshakeUnexpectedEOF:
271 raise http.HttpError("Server closed connection during SSL handshake")
273 def _SendRequest(self):
274 """Sends request to server.
278 send_headers = self.DEFAULT_HEADERS.copy()
280 if self.request.headers:
281 send_headers.update(self.request.headers)
283 send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
287 msg = http.HttpMessage()
289 # Combine request line. We only support HTTP/1.0 (no chunked transfers and
291 # TODO: For keep-alive, change to HTTP/1.1
293 http.HttpClientToServerStartLine(method=self.request.method.upper(),
294 path=self.request.path,
295 version=http.HTTP_1_0)
296 msg.headers = send_headers
297 msg.body = self.request.post_data
300 _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
301 except http.HttpSocketTimeout:
302 raise http.HttpError("Timeout while sending request")
303 except socket.error, err:
304 raise http.HttpError("Error sending request: %s" % err)
306 def _ReadResponse(self):
307 """Read response from server.
310 response_msg = http.HttpMessage()
313 response_msg_reader = \
314 _HttpServerToClientMessageReader(self.sock, response_msg,
316 except http.HttpSocketTimeout:
317 raise http.HttpError("Timeout while reading response")
318 except socket.error, err:
319 raise http.HttpError("Error reading response: %s" % err)
321 return (response_msg_reader, response_msg)
324 class _HttpClientPendingRequest(object):
325 """Data class for pending requests.
328 def __init__(self, request):
329 self.request = request
331 # Thread synchronization
332 self.done = threading.Event()
335 class HttpClientWorker(workerpool.BaseWorker):
336 """HTTP client worker class.
339 def RunTask(self, pend_req):
341 HttpClientRequestExecutor(pend_req.request)
346 class HttpClientWorkerPool(workerpool.WorkerPool):
347 def __init__(self, manager):
348 workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
350 self.manager = manager
353 class HttpClientManager(object):
354 """Manages HTTP requests.
358 self._wpool = HttpClientWorkerPool(self)
363 def ExecRequests(self, requests):
364 """Execute HTTP requests.
366 This function can be called from multiple threads at the same time.
368 @type requests: List of HttpClientRequest instances
369 @param requests: The requests to execute
370 @rtype: List of HttpClientRequest instances
371 @return: The list of requests passed in
374 # _HttpClientPendingRequest is used for internal thread synchronization
375 pending = [_HttpClientPendingRequest(req) for req in requests]
378 # Add requests to queue
379 for pend_req in pending:
380 self._wpool.AddTask(pend_req)
383 # In case of an exception we should still wait for the rest, otherwise
384 # another thread from the worker pool could modify the request object
387 # And wait for them to finish
388 for pend_req in pending:
391 # Return original list
395 self._wpool.Quiesce()
396 self._wpool.TerminateWorkers()