4 # Copyright (C) 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
28 from cStringIO import StringIO
30 from ganeti import http
31 from ganeti import compat
32 from ganeti import netutils
33 from ganeti import locking
36 class HttpClientRequest(object):
37 def __init__(self, host, port, method, path, headers=None, post_data=None,
38 read_timeout=None, curl_config_fn=None, nicename=None):
39 """Describes an HTTP request.
46 @param method: Method name
48 @param path: Request path
49 @type headers: list or None
50 @param headers: Additional headers to send, list of strings
51 @type post_data: string or None
52 @param post_data: Additional data to send
53 @type read_timeout: int
54 @param read_timeout: if passed, it will be used as the read
55 timeout while reading the response from the server
56 @type curl_config_fn: callable
57 @param curl_config_fn: Function to configure cURL object before request
58 @type nicename: string
59 @param nicename: Name, presentable to a user, to describe this request (no
63 assert path.startswith("/"), "Path must start with slash (/)"
64 assert curl_config_fn is None or callable(curl_config_fn)
71 self.read_timeout = read_timeout
72 self.curl_config_fn = curl_config_fn
73 self.nicename = nicename
78 self.post_data = post_data
82 elif isinstance(headers, dict):
83 # Support for old interface
84 self.headers = ["%s: %s" % (name, value)
85 for name, value in headers.items()]
87 self.headers = headers
94 self.resp_status_code = None
98 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
99 "%s:%s" % (self.host, self.port),
103 return "<%s at %#x>" % (" ".join(status), id(self))
107 """Returns the full URL for this requests.
110 if netutils.IPAddress.IsValid(self.host):
111 address = netutils.FormatAddress((self.host, self.port))
113 address = "%s:%s" % (self.host, self.port)
114 # TODO: Support for non-SSL requests
115 return "https://%s%s" % (address, self.path)
118 def _StartRequest(curl, req):
119 """Starts a request on a cURL object.
121 @type curl: pycurl.Curl
122 @param curl: cURL object
123 @type req: L{HttpClientRequest}
124 @param req: HTTP request
127 logging.debug("Starting request %r", req)
131 post_data = req.post_data
132 headers = req.headers
134 # PycURL requires strings to be non-unicode
135 assert isinstance(method, str)
136 assert isinstance(url, str)
137 assert isinstance(post_data, str)
138 assert compat.all(isinstance(i, str) for i in headers)
140 # Buffer for response
141 resp_buffer = StringIO()
143 # Configure client for request
144 curl.setopt(pycurl.VERBOSE, False)
145 curl.setopt(pycurl.NOSIGNAL, True)
146 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
147 curl.setopt(pycurl.PROXY, "")
148 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
149 curl.setopt(pycurl.URL, url)
150 curl.setopt(pycurl.POSTFIELDS, post_data)
151 curl.setopt(pycurl.HTTPHEADER, headers)
153 if req.read_timeout is None:
154 curl.setopt(pycurl.TIMEOUT, 0)
156 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
158 # Disable SSL session ID caching (pycurl >= 7.16.0)
159 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
160 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
162 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
164 # Pass cURL object to external config function
165 if req.curl_config_fn:
166 req.curl_config_fn(curl)
168 return _PendingRequest(curl, req, resp_buffer.getvalue)
171 class _PendingRequest:
172 def __init__(self, curl, req, resp_buffer_read):
173 """Initializes this class.
175 @type curl: pycurl.Curl
176 @param curl: cURL object
177 @type req: L{HttpClientRequest}
178 @param req: HTTP request
179 @type resp_buffer_read: callable
180 @param resp_buffer_read: Function to read response body
183 assert req.success is None
187 self._resp_buffer_read = resp_buffer_read
189 def GetCurlHandle(self):
190 """Returns the cURL object.
195 def GetCurrentRequest(self):
196 """Returns the current request.
201 def Done(self, errmsg):
202 """Finishes a request.
204 @type errmsg: string or None
205 @param errmsg: Error message if request failed
211 assert req.success is None, "Request has already been finalized"
213 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
215 req.success = not bool(errmsg)
218 # Get HTTP response code
219 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
220 req.resp_body = self._resp_buffer_read()
222 # Ensure no potentially large variables are referenced
224 # Only available in PycURL 7.19.0 and above
225 reset_fn = curl.reset
226 except AttributeError:
227 curl.setopt(pycurl.POSTFIELDS, "")
228 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
233 class _NoOpRequestMonitor: # pylint: disable=W0232
234 """No-op request monitor.
238 def acquire(*args, **kwargs):
245 class _PendingRequestMonitor:
248 def __init__(self, owner, pending_fn):
249 """Initializes this class.
253 self._pending_fn = pending_fn
255 # The lock monitor runs in another thread, hence locking is necessary
256 self._lock = locking.SharedLock("PendingHttpRequests")
257 self.acquire = self._lock.acquire
258 self.release = self._lock.release
260 @locking.ssynchronized(_LOCK)
265 self._pending_fn = None
267 @locking.ssynchronized(_LOCK, shared=1)
268 def GetLockInfo(self, requested): # pylint: disable=W0613
269 """Retrieves information about pending requests.
272 @param requested: Requested information, see C{query.LQ_*}
275 # No need to sort here, that's being done by the lock manager and query
276 # library. There are no priorities for requests, hence all show up as
277 # one item under "pending".
281 owner_name = self._owner.getName()
283 for client in self._pending_fn():
284 req = client.GetCurrentRequest()
286 if req.nicename is None:
287 name = "%s%s" % (req.host, req.path)
290 result.append(("rpc/%s" % name, None, [owner_name], None))
295 def _ProcessCurlRequests(multi, requests):
296 """cURL request processor.
298 This generator yields a tuple once for every completed request, successful or
299 not. The first value in the tuple is the handle, the second an error message
300 or C{None} for successful requests.
302 @type multi: C{pycurl.CurlMulti}
303 @param multi: cURL multi object
304 @type requests: sequence
305 @param requests: cURL request handles
308 for curl in requests:
309 multi.add_handle(curl)
312 (ret, active) = multi.perform()
313 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
315 if ret == pycurl.E_CALL_MULTI_PERFORM:
316 # cURL wants to be called again
320 (remaining_messages, successful, failed) = multi.info_read()
322 for curl in successful:
323 multi.remove_handle(curl)
326 for curl, errnum, errmsg in failed:
327 multi.remove_handle(curl)
328 yield (curl, "Error %s: %s" % (errnum, errmsg))
330 if remaining_messages == 0:
334 # No active handles anymore
337 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
338 # timeouts, which are only evaluated in multi.perform, aren't
339 # unnecessarily delayed.
343 def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
344 _curl_multi=pycurl.CurlMulti,
345 _curl_process=_ProcessCurlRequests):
346 """Processes any number of HTTP client requests.
348 @type requests: list of L{HttpClientRequest}
349 @param requests: List of all requests
350 @param lock_monitor_cb: Callable for registering with lock monitor
353 assert compat.all((req.error is None and
354 req.success is None and
355 req.resp_status_code is None and
356 req.resp_body is None)
359 # Prepare all requests
361 dict((client.GetCurlHandle(), client)
362 for client in map(lambda req: _StartRequest(_curl(), req), requests))
364 assert len(curl_to_client) == len(requests)
367 monitor = _PendingRequestMonitor(threading.currentThread(),
368 curl_to_client.values)
369 lock_monitor_cb(monitor)
371 monitor = _NoOpRequestMonitor
373 # Process all requests and act based on the returned values
374 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
375 monitor.acquire(shared=0)
377 curl_to_client.pop(curl).Done(msg)
381 assert not curl_to_client, "Not all requests were processed"
383 # Don't try to read information anymore as all requests have been processed
386 assert compat.all(req.error is not None or
388 req.resp_status_code is not None and
389 req.resp_body is not None)