4 # Copyright (C) 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import utils
42 from ganeti import http
45 HTTP_CLIENT_THREADS = 10
48 class HttpClientRequest(object):
49 def __init__(self, host, port, method, path, headers=None, post_data=None,
50 ssl_params=None, ssl_verify_peer=False):
51 """Describes an HTTP request.
58 @param method: Method name
60 @param path: Request path
61 @type headers: dict or None
62 @param headers: Additional headers to send
63 @type post_data: string or None
64 @param post_data: Additional data to send
65 @type ssl_params: HttpSslParams
66 @param ssl_params: SSL key and certificate
67 @type ssl_verify_peer: bool
68 @param ssl_verify_peer: Whether to compare our certificate with server's
72 if post_data is not None:
73 assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74 "Only POST and GET requests support sending data"
76 assert path.startswith("/"), "Path must start with slash (/)"
81 self.ssl_params = ssl_params
82 self.ssl_verify_peer = ssl_verify_peer
85 self.headers = headers
86 self.post_data = post_data
95 self.resp_version = None
96 self.resp_status_code = None
97 self.resp_reason = None
98 self.resp_headers = None
102 class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
106 class _HttpServerToClientMessageReader(http.HttpMessageReader):
108 START_LINE_LENGTH_MAX = 512
109 HEADER_LENGTH_MAX = 4096
111 def ParseStartLine(self, start_line):
112 """Parses the status line sent by the server.
115 # Empty lines are skipped when reading
119 [version, status, reason] = start_line.split(None, 2)
122 [version, status] = start_line.split(None, 1)
125 version = http.HTTP_0_9
128 version = version.upper()
130 # The status code is a three-digit number
133 if status < 100 or status > 999:
139 raise http.HttpError("Invalid status code (%r)" % start_line)
141 return http.HttpServerToClientStartLine(version, status, reason)
144 class HttpClientRequestExecutor(http.HttpSocketBase):
147 http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148 # TODO: For keep-alive, don't send "Connection: close"
149 http.HTTP_CONNECTION: "close",
152 # Timeouts in seconds for socket layer
153 # TODO: Soft timeout instead of only socket timeout?
154 # TODO: Make read timeout configurable per OpCode?
160 def __init__(self, req):
161 """Initializes the HttpClientRequestExecutor class.
163 @type req: HttpClientRequest
164 @param req: Request object
167 http.HttpSocketBase.__init__(self)
170 self.poller = select.poll()
173 # TODO: Implement connection caching/keep-alive
174 self.sock = self._CreateSocket(req.ssl_params,
177 # Disable Python's timeout
178 self.sock.settimeout(None)
180 # Operate in non-blocking mode
181 self.sock.setblocking(0)
183 response_msg_reader = None
190 (response_msg_reader, response_msg) = self._ReadResponse()
192 # Only wait for server to close if we didn't have any exception.
195 # TODO: Keep-alive is not supported, always close connection
197 http.ShutdownConnection(self.poller, self.sock,
198 self.CLOSE_TIMEOUT, self.WRITE_TIMEOUT,
199 response_msg_reader, force_close)
204 req.response = response_msg
206 req.resp_version = req.response.start_line.version
207 req.resp_status_code = req.response.start_line.code
208 req.resp_reason = req.response.start_line.reason
209 req.resp_headers = req.response.headers
210 req.resp_body = req.response.body
215 except http.HttpError, err:
220 """Non-blocking connect to host with timeout.
226 connect_error = self.sock.connect_ex((self.request.host,
228 except socket.gaierror, err:
229 raise http.HttpError("Connection failed: %s" % str(err))
231 if connect_error == errno.EINTR:
235 elif connect_error == 0:
236 # Connection established
240 elif connect_error == errno.EINPROGRESS:
244 raise http.HttpError("Connection failed (%s: %s)" %
245 (connect_error, os.strerror(connect_error)))
248 # Wait for connection
249 event = http.WaitForSocketCondition(self.poller, self.sock,
250 select.POLLOUT, self.CONNECT_TIMEOUT)
252 raise http.HttpError("Timeout while connecting to server")
255 connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
256 if connect_error != 0:
257 raise http.HttpError("Connection failed (%s: %s)" %
258 (connect_error, os.strerror(connect_error)))
260 # Enable TCP keep-alive
261 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
263 # If needed, Linux specific options are available to change the TCP
264 # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
267 def _SendRequest(self):
268 """Sends request to server.
272 send_headers = self.DEFAULT_HEADERS.copy()
274 if self.request.headers:
275 send_headers.update(self.request.headers)
277 send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
280 msg = http.HttpMessage()
282 # Combine request line. We only support HTTP/1.0 (no chunked transfers and
284 # TODO: For keep-alive, change to HTTP/1.1
286 http.HttpClientToServerStartLine(method=self.request.method.upper(),
287 path=self.request.path, version=http.HTTP_1_0)
288 msg.headers = send_headers
289 msg.body = self.request.post_data
292 _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
293 except http.HttpSocketTimeout:
294 raise http.HttpError("Timeout while sending request")
295 except socket.error, err:
296 raise http.HttpError("Error sending request: %s" % err)
298 def _ReadResponse(self):
299 """Read response from server.
302 response_msg = http.HttpMessage()
305 response_msg_reader = \
306 _HttpServerToClientMessageReader(self.sock, response_msg,
308 except http.HttpSocketTimeout:
309 raise http.HttpError("Timeout while reading response")
310 except socket.error, err:
311 raise http.HttpError("Error reading response: %s" % err)
313 return (response_msg_reader, response_msg)
316 class _HttpClientPendingRequest(object):
317 """Data class for pending requests.
320 def __init__(self, request):
321 self.request = request
323 # Thread synchronization
324 self.done = threading.Event()
327 class HttpClientWorker(workerpool.BaseWorker):
328 """HTTP client worker class.
331 def RunTask(self, pend_req):
333 HttpClientRequestExecutor(pend_req.request)
338 class HttpClientWorkerPool(workerpool.WorkerPool):
339 def __init__(self, manager):
340 workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
342 self.manager = manager
345 class HttpClientManager(object):
346 """Manages HTTP requests.
350 self._wpool = HttpClientWorkerPool(self)
355 def ExecRequests(self, requests):
356 """Execute HTTP requests.
358 This function can be called from multiple threads at the same time.
360 @type requests: List of HttpClientRequest instances
361 @param requests: The requests to execute
362 @rtype: List of HttpClientRequest instances
363 @returns: The list of requests passed in
366 # _HttpClientPendingRequest is used for internal thread synchronization
367 pending = [_HttpClientPendingRequest(req) for req in requests]
370 # Add requests to queue
371 for pend_req in pending:
372 self._wpool.AddTask(pend_req)
375 # In case of an exception we should still wait for the rest, otherwise
376 # another thread from the worker pool could modify the request object
379 # And wait for them to finish
380 for pend_req in pending:
383 # Return original list
387 self._wpool.Quiesce()
388 self._wpool.TerminateWorkers()