Fix _NOQUOTE regexp
[ganeti-local] / lib / http / client.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP client module.
22
23 """
24
25 import BaseHTTPServer
26 import cgi
27 import logging
28 import OpenSSL
29 import os
30 import select
31 import socket
32 import sys
33 import time
34 import signal
35 import errno
36 import threading
37
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import workerpool
41 from ganeti import utils
42 from ganeti import http
43
44
45 HTTP_CLIENT_THREADS = 10
46
47
48 class HttpClientRequest(object):
49   def __init__(self, host, port, method, path, headers=None, post_data=None,
50                ssl_params=None, ssl_verify_peer=False):
51     """Describes an HTTP request.
52
53     @type host: string
54     @param host: Hostname
55     @type port: int
56     @param port: Port
57     @type method: string
58     @param method: Method name
59     @type path: string
60     @param path: Request path
61     @type headers: dict or None
62     @param headers: Additional headers to send
63     @type post_data: string or None
64     @param post_data: Additional data to send
65     @type ssl_params: HttpSslParams
66     @param ssl_params: SSL key and certificate
67     @type ssl_verify_peer: bool
68     @param ssl_verify_peer: Whether to compare our certificate with
69         server's certificate
70
71     """
72     if post_data is not None:
73       assert method.upper() in (http.HTTP_POST, http.HTTP_PUT), \
74         "Only POST and GET requests support sending data"
75
76     assert path.startswith("/"), "Path must start with slash (/)"
77
78     # Request attributes
79     self.host = host
80     self.port = port
81     self.ssl_params = ssl_params
82     self.ssl_verify_peer = ssl_verify_peer
83     self.method = method
84     self.path = path
85     self.headers = headers
86     self.post_data = post_data
87
88     self.success = None
89     self.error = None
90
91     # Raw response
92     self.response = None
93
94     # Response attributes
95     self.resp_version = None
96     self.resp_status_code = None
97     self.resp_reason = None
98     self.resp_headers = None
99     self.resp_body = None
100
101
102 class _HttpClientToServerMessageWriter(http.HttpMessageWriter):
103   pass
104
105
106 class _HttpServerToClientMessageReader(http.HttpMessageReader):
107   # Length limits
108   START_LINE_LENGTH_MAX = 512
109   HEADER_LENGTH_MAX = 4096
110
111   def ParseStartLine(self, start_line):
112     """Parses the status line sent by the server.
113
114     """
115     # Empty lines are skipped when reading
116     assert start_line
117
118     try:
119       [version, status, reason] = start_line.split(None, 2)
120     except ValueError:
121       try:
122         [version, status] = start_line.split(None, 1)
123         reason = ""
124       except ValueError:
125         version = http.HTTP_0_9
126
127     if version:
128       version = version.upper()
129
130     # The status code is a three-digit number
131     try:
132       status = int(status)
133       if status < 100 or status > 999:
134         status = -1
135     except ValueError:
136       status = -1
137
138     if status == -1:
139       raise http.HttpError("Invalid status code (%r)" % start_line)
140
141     return http.HttpServerToClientStartLine(version, status, reason)
142
143
144 class HttpClientRequestExecutor(http.HttpBase):
145   # Default headers
146   DEFAULT_HEADERS = {
147     http.HTTP_USER_AGENT: http.HTTP_GANETI_VERSION,
148     # TODO: For keep-alive, don't send "Connection: close"
149     http.HTTP_CONNECTION: "close",
150     }
151
152   # Timeouts in seconds for socket layer
153   # TODO: Soft timeout instead of only socket timeout?
154   # TODO: Make read timeout configurable per OpCode?
155   CONNECT_TIMEOUT = 5
156   WRITE_TIMEOUT = 10
157   READ_TIMEOUT = None
158   CLOSE_TIMEOUT = 1
159
160   def __init__(self, req):
161     """Initializes the HttpClientRequestExecutor class.
162
163     @type req: HttpClientRequest
164     @param req: Request object
165
166     """
167     http.HttpBase.__init__(self)
168     self.request = req
169
170     try:
171       # TODO: Implement connection caching/keep-alive
172       self.sock = self._CreateSocket(req.ssl_params,
173                                      req.ssl_verify_peer)
174
175       # Disable Python's timeout
176       self.sock.settimeout(None)
177
178       # Operate in non-blocking mode
179       self.sock.setblocking(0)
180
181       response_msg_reader = None
182       response_msg = None
183       force_close = True
184
185       self._Connect()
186       try:
187         self._SendRequest()
188         (response_msg_reader, response_msg) = self._ReadResponse()
189
190         # Only wait for server to close if we didn't have any exception.
191         force_close = False
192       finally:
193         # TODO: Keep-alive is not supported, always close connection
194         force_close = True
195         http.ShutdownConnection(self.sock, self.CLOSE_TIMEOUT,
196                                 self.WRITE_TIMEOUT, response_msg_reader,
197                                 force_close)
198
199       self.sock.close()
200       self.sock = None
201
202       req.response = response_msg
203
204       req.resp_version = req.response.start_line.version
205       req.resp_status_code = req.response.start_line.code
206       req.resp_reason = req.response.start_line.reason
207       req.resp_headers = req.response.headers
208       req.resp_body = req.response.body
209
210       req.success = True
211       req.error = None
212
213     except http.HttpError, err:
214       req.success = False
215       req.error = str(err)
216
217   def _Connect(self):
218     """Non-blocking connect to host with timeout.
219
220     """
221     connected = False
222     while True:
223       try:
224         connect_error = self.sock.connect_ex((self.request.host,
225                                               self.request.port))
226       except socket.gaierror, err:
227         raise http.HttpError("Connection failed: %s" % str(err))
228
229       if connect_error == errno.EINTR:
230         # Mask signals
231         pass
232
233       elif connect_error == 0:
234         # Connection established
235         connected = True
236         break
237
238       elif connect_error == errno.EINPROGRESS:
239         # Connection started
240         break
241
242       raise http.HttpError("Connection failed (%s: %s)" %
243                              (connect_error, os.strerror(connect_error)))
244
245     if not connected:
246       # Wait for connection
247       event = http.WaitForSocketCondition(self.sock, select.POLLOUT,
248                                           self.CONNECT_TIMEOUT)
249       if event is None:
250         raise http.HttpError("Timeout while connecting to server")
251
252       # Get error code
253       connect_error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
254       if connect_error != 0:
255         raise http.HttpError("Connection failed (%s: %s)" %
256                                (connect_error, os.strerror(connect_error)))
257
258     # Enable TCP keep-alive
259     self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
260
261     # If needed, Linux specific options are available to change the TCP
262     # keep-alive settings, see "man 7 tcp" for TCP_KEEPCNT, TCP_KEEPIDLE and
263     # TCP_KEEPINTVL.
264
265     # Do the secret SSL handshake
266     if self.using_ssl:
267       self.sock.set_connect_state()
268       try:
269         http.Handshake(self.sock, self.WRITE_TIMEOUT)
270       except http.HttpSessionHandshakeUnexpectedEOF:
271         raise http.HttpError("Server closed connection during SSL handshake")
272
273   def _SendRequest(self):
274     """Sends request to server.
275
276     """
277     # Headers
278     send_headers = self.DEFAULT_HEADERS.copy()
279
280     if self.request.headers:
281       send_headers.update(self.request.headers)
282
283     send_headers[http.HTTP_HOST] = "%s:%s" % (self.request.host,
284                                               self.request.port)
285
286     # Response message
287     msg = http.HttpMessage()
288
289     # Combine request line. We only support HTTP/1.0 (no chunked transfers and
290     # no keep-alive).
291     # TODO: For keep-alive, change to HTTP/1.1
292     msg.start_line = \
293       http.HttpClientToServerStartLine(method=self.request.method.upper(),
294                                        path=self.request.path,
295                                        version=http.HTTP_1_0)
296     msg.headers = send_headers
297     msg.body = self.request.post_data
298
299     try:
300       _HttpClientToServerMessageWriter(self.sock, msg, self.WRITE_TIMEOUT)
301     except http.HttpSocketTimeout:
302       raise http.HttpError("Timeout while sending request")
303     except socket.error, err:
304       raise http.HttpError("Error sending request: %s" % err)
305
306   def _ReadResponse(self):
307     """Read response from server.
308
309     """
310     response_msg = http.HttpMessage()
311
312     try:
313       response_msg_reader = \
314         _HttpServerToClientMessageReader(self.sock, response_msg,
315                                          self.READ_TIMEOUT)
316     except http.HttpSocketTimeout:
317       raise http.HttpError("Timeout while reading response")
318     except socket.error, err:
319       raise http.HttpError("Error reading response: %s" % err)
320
321     return (response_msg_reader, response_msg)
322
323
324 class _HttpClientPendingRequest(object):
325   """Data class for pending requests.
326
327   """
328   def __init__(self, request):
329     self.request = request
330
331     # Thread synchronization
332     self.done = threading.Event()
333
334
335 class HttpClientWorker(workerpool.BaseWorker):
336   """HTTP client worker class.
337
338   """
339   def RunTask(self, pend_req):
340     try:
341       HttpClientRequestExecutor(pend_req.request)
342     finally:
343       pend_req.done.set()
344
345
346 class HttpClientWorkerPool(workerpool.WorkerPool):
347   def __init__(self, manager):
348     workerpool.WorkerPool.__init__(self, HTTP_CLIENT_THREADS,
349                                    HttpClientWorker)
350     self.manager = manager
351
352
353 class HttpClientManager(object):
354   """Manages HTTP requests.
355
356   """
357   def __init__(self):
358     self._wpool = HttpClientWorkerPool(self)
359
360   def __del__(self):
361     self.Shutdown()
362
363   def ExecRequests(self, requests):
364     """Execute HTTP requests.
365
366     This function can be called from multiple threads at the same time.
367
368     @type requests: List of HttpClientRequest instances
369     @param requests: The requests to execute
370     @rtype: List of HttpClientRequest instances
371     @return: The list of requests passed in
372
373     """
374     # _HttpClientPendingRequest is used for internal thread synchronization
375     pending = [_HttpClientPendingRequest(req) for req in requests]
376
377     try:
378       # Add requests to queue
379       for pend_req in pending:
380         self._wpool.AddTask(pend_req)
381
382     finally:
383       # In case of an exception we should still wait for the rest, otherwise
384       # another thread from the worker pool could modify the request object
385       # after we returned.
386
387       # And wait for them to finish
388       for pend_req in pending:
389         pend_req.done.wait()
390
391     # Return original list
392     return requests
393
394   def Shutdown(self):
395     self._wpool.Quiesce()
396     self._wpool.TerminateWorkers()