4 # Copyright (C) 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
25 # pylint: disable-msg=E1103
27 # # E1103: %s %r has no %r member (but some types could not be
28 # inferred), since _socketobject could be ssl or not and pylint
38 from ganeti import workerpool
39 from ganeti import http
40 from ganeti import utils
43 HTTP_CLIENT_THREADS = 10
46 class HttpClientRequest(object):
47 def __init__(self, host, port, method, path, headers=None, post_data=None,
48 ssl_params=None, ssl_verify_peer=False, read_timeout=None):
49 """Describes an HTTP request.
56 @param method: Method name
58 @param path: Request path
59 @type headers: dict or None
60 @param headers: Additional headers to send
61 @type post_data: string or None
62 @param post_data: Additional data to send
63 @type ssl_params: HttpSslParams
64 @param ssl_params: SSL key and certificate
65 @type ssl_verify_peer: bool
66 @param ssl_verify_peer: Whether to compare our certificate with
68 @type read_timeout: int
69 @param read_timeout: if passed, it will be used as the read
70 timeout while reading the response from the server
73 if post_data is not None:
74 assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
75 "Only POST and GET requests support sending data"
77 assert path.startswith("/"), "Path must start with slash (/)"
82 self.ssl_params = ssl_params
83 self.ssl_verify_peer = ssl_verify_peer
86 self.headers = headers
87 self.post_data = post_data
88 self.read_timeout = read_timeout
97 self.resp_version = None
98 self.resp_status_code = None
99 self.resp_reason = None
100 self.resp_headers = None
101 self.resp_body = None
104 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105 "%s:%s" % (self.host, self.port),
109 return "<%s at %#x>" % (" ".join(status), id(self))
112 class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
116 class _HttpServerToClientMessageReader(http.HttpMessageReader):
118 START_LINE_LENGTH_MAX = 512
119 HEADER_LENGTH_MAX = 4096
121 def ParseStartLine(self, start_line):
122 """Parses the status line sent by the server.
125 # Empty lines are skipped when reading
129 [version, status, reason] = start_line.split(None, 2)
132 [version, status] = start_line.split(None, 1)
135 version = http.HTTP_0_9
138 version = version.upper()
140 # The status code is a three-digit number
143 if status < 100 or status > 999:
145 except (TypeError, ValueError):
149 raise http.HttpError("Invalid status code (%r)" % start_line)
151 return http.HttpServerToClientStartLine(version, status, reason)
154 class HttpClientRequestExecutor(http.HttpBase):
157 http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
158 # TODO: For keep-alive, don't send "Connection: close"
159 http.HTTP_CONNECTION: "close",
162 # Timeouts in seconds for socket layer
163 # TODO: Soft timeout instead of only socket timeout?
164 # TODO: Make read timeout configurable per OpCode?
170 def __init__(self, req):
171 """Initializes the HttpClientRequestExecutor class.
173 @type req: HttpClientRequest
174 @param req: Request object
177 http.HttpBase.__init__(self)
181 # TODO: Implement connection caching/keep-alive
182 self.sock = self._CreateSocket(req.ssl_params,
185 # Disable Python's timeout
186 self.sock.settimeout(None)
188 # Operate in non-blocking mode
189 self.sock.setblocking(0)
191 response_msg_reader = None
198 (response_msg_reader, response_msg) = self._ReadResponse()
200 # Only wait for server to close if we didn't have any exception.
203 # TODO: Keep-alive is not supported, always close connection
205 http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
206 self.WRITE_TIMEOUT, response_msg_reader,
212 req.response = response_msg
214 req.resp_version = req.response.start_line.version
215 req.resp_status_code = req.response.start_line.code
216 req.resp_reason = req.response.start_line.reason
217 req.resp_headers = req.response.headers
218 req.resp_body = req.response.body
223 except http.HttpError, err:
228 """Non-blocking connect to host with timeout.
234 connect_error = self.sock.connect_ex((self.request.host,
236 except socket.gaierror, err:
237 raise http.HttpError("Connection failed: %s" % str(err))
239 if connect_error == errno.EINTR:
243 elif connect_error == 0:
244 # Connection established
248 elif connect_error == errno.EINPROGRESS:
252 raise http.HttpError("Connection failed (%s: %s)" %
253 (connect_error, os.strerror(connect_error)))
256 # Wait for connection
257 event = utils.WaitForFdCondition(self.sock, select.POLLOUT,
258 self.CONNECT_TIMEOUT)
260 raise http.HttpError("Timeout while connecting to server")
263 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
264 if connect_error != 0:
265 raise http.HttpError("Connection failed (%s: %s)" %
266 (connect_error, os.strerror(connect_error)))
268 # Enable TCP keep-alive
269 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
271 # If needed, Linux specific options are available to change the TCP
272 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
275 # Do the secret SSL handshake
277 self.sock.set_connect_state() # pylint: disable-msg=E1103
279 http.Handshake(self.sock, self.WRITE_TIMEOUT)
280 except http.HttpSessionHandshakeUnexpectedEOF:
281 raise http.HttpError("Server closed connection during SSL handshake")
283 def _SendRequest(self):
284 """Sends request to server.
288 send_headers = self.DEFAULT_HEADERS.copy()
290 if self.request.headers:
291 send_headers.update(self.request.headers)
293 send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
297 msg = http.HttpMessage()
299 # Combine request line. We only support HTTP/1.0 (no chunked transfers and
301 # TODO: For keep-alive, change to HTTP/1.1
303 http.HttpClientToServerStartLine(method=self.request.method.upper(),
304 path=self.request.path,
305 version=http.HTTP_1_0)
306 msg.headers = send_headers
307 msg.body = self.request.post_data
310 _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
311 except http.HttpSocketTimeout:
312 raise http.HttpError("Timeout while sending request")
313 except socket.error, err:
314 raise http.HttpError("Error sending request: %s" % err)
316 def _ReadResponse(self):
317 """Read response from server.
320 response_msg = http.HttpMessage()
322 if self.request.read_timeout is None:
323 timeout = self.READ_TIMEOUT
325 timeout = self.request.read_timeout
328 response_msg_reader = \
329 _HttpServerToClientMessageReader(self.sock, response_msg, timeout)
330 except http.HttpSocketTimeout:
331 raise http.HttpError("Timeout while reading response")
332 except socket.error, err:
333 raise http.HttpError("Error reading response: %s" % err)
335 return (response_msg_reader, response_msg)
338 class _HttpClientPendingRequest(object):
339 """Data class for pending requests.
342 def __init__(self, request):
343 self.request = request
345 # Thread synchronization
346 self.done = threading.Event()
349 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
350 "req=%r" % self.request]
352 return "<%s at %#x>" % (" ".join(status), id(self))
355 class HttpClientWorker(workerpool.BaseWorker):
356 """HTTP client worker class.
359 def RunTask(self, pend_req): # pylint: disable-msg=W0221
361 HttpClientRequestExecutor(pend_req.request)
366 class HttpClientWorkerPool(workerpool.WorkerPool):
367 def __init__(self, manager):
368 workerpool.WorkerPool.__init__(self, "HttpClient",
371 self.manager = manager
374 class HttpClientManager(object):
375 """Manages HTTP requests.
379 self._wpool = HttpClientWorkerPool(self)
384 def ExecRequests(self, requests):
385 """Execute HTTP requests.
387 This function can be called from multiple threads at the same time.
389 @type requests: List of HttpClientRequest instances
390 @param requests: The requests to execute
391 @rtype: List of HttpClientRequest instances
392 @return: The list of requests passed in
395 # _HttpClientPendingRequest is used for internal thread synchronization
396 pending = [_HttpClientPendingRequest(req) for req in requests]
399 # Add requests to queue
400 for pend_req in pending:
401 self._wpool.AddTask(pend_req)
404 # In case of an exception we should still wait for the rest, otherwise
405 # another thread from the worker pool could modify the request object
408 # And wait for them to finish
409 for pend_req in pending:
412 # Return original list
416 self._wpool.Quiesce()
417 self._wpool.TerminateWorkers()