4 # Copyright (C) 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """HTTP client module.
28 from cStringIO import StringIO
30 from ganeti import http
31 from ganeti import compat
32 from ganeti import netutils
33 from ganeti import locking
36 class HttpClientRequest(object):
37 def __init__(self, host, port, method, path, headers=None, post_data=None,
38 read_timeout=None, curl_config_fn=None, nicename=None,
40 """Describes an HTTP request.
47 @param method: Method name
49 @param path: Request path
50 @type headers: list or None
51 @param headers: Additional headers to send, list of strings
52 @type post_data: string or None
53 @param post_data: Additional data to send
54 @type read_timeout: int
55 @param read_timeout: if passed, it will be used as the read
56 timeout while reading the response from the server
57 @type curl_config_fn: callable
58 @param curl_config_fn: Function to configure cURL object before request
59 @type nicename: string
60 @param nicename: Name, presentable to a user, to describe this request (no
62 @type completion_cb: callable accepting this request object as a single
64 @param completion_cb: Callback for request completion
67 assert path.startswith("/"), "Path must start with slash (/)"
68 assert curl_config_fn is None or callable(curl_config_fn)
69 assert completion_cb is None or callable(completion_cb)
76 self.read_timeout = read_timeout
77 self.curl_config_fn = curl_config_fn
78 self.nicename = nicename
79 self.completion_cb = completion_cb
84 self.post_data = post_data
88 elif isinstance(headers, dict):
89 # Support for old interface
90 self.headers = ["%s: %s" % (name, value)
91 for name, value in headers.items()]
93 self.headers = headers
100 self.resp_status_code = None
101 self.resp_body = None
104 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105 "%s:%s" % (self.host, self.port),
109 return "<%s at %#x>" % (" ".join(status), id(self))
113 """Returns the full URL for this requests.
116 if netutils.IPAddress.IsValid(self.host):
117 address = netutils.FormatAddress((self.host, self.port))
119 address = "%s:%s" % (self.host, self.port)
120 # TODO: Support for non-SSL requests
121 return "https://%s%s" % (address, self.path)
124 def _StartRequest(curl, req):
125 """Starts a request on a cURL object.
127 @type curl: pycurl.Curl
128 @param curl: cURL object
129 @type req: L{HttpClientRequest}
130 @param req: HTTP request
133 logging.debug("Starting request %r", req)
137 post_data = req.post_data
138 headers = req.headers
140 # PycURL requires strings to be non-unicode
141 assert isinstance(method, str)
142 assert isinstance(url, str)
143 assert isinstance(post_data, str)
144 assert compat.all(isinstance(i, str) for i in headers)
146 # Buffer for response
147 resp_buffer = StringIO()
149 # Configure client for request
150 curl.setopt(pycurl.VERBOSE, False)
151 curl.setopt(pycurl.NOSIGNAL, True)
152 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
153 curl.setopt(pycurl.PROXY, "")
154 curl.setopt(pycurl.CUSTOMREQUEST, str(method))
155 curl.setopt(pycurl.URL, url)
156 curl.setopt(pycurl.POSTFIELDS, post_data)
157 curl.setopt(pycurl.HTTPHEADER, headers)
159 if req.read_timeout is None:
160 curl.setopt(pycurl.TIMEOUT, 0)
162 curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
164 # Disable SSL session ID caching (pycurl >= 7.16.0)
165 if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
166 curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
168 curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
170 # Pass cURL object to external config function
171 if req.curl_config_fn:
172 req.curl_config_fn(curl)
174 return _PendingRequest(curl, req, resp_buffer.getvalue)
177 class _PendingRequest:
178 def __init__(self, curl, req, resp_buffer_read):
179 """Initializes this class.
181 @type curl: pycurl.Curl
182 @param curl: cURL object
183 @type req: L{HttpClientRequest}
184 @param req: HTTP request
185 @type resp_buffer_read: callable
186 @param resp_buffer_read: Function to read response body
189 assert req.success is None
193 self._resp_buffer_read = resp_buffer_read
195 def GetCurlHandle(self):
196 """Returns the cURL object.
201 def GetCurrentRequest(self):
202 """Returns the current request.
207 def Done(self, errmsg):
208 """Finishes a request.
210 @type errmsg: string or None
211 @param errmsg: Error message if request failed
217 assert req.success is None, "Request has already been finalized"
219 logging.debug("Request %s finished, errmsg=%s", req, errmsg)
221 req.success = not bool(errmsg)
224 # Get HTTP response code
225 req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
226 req.resp_body = self._resp_buffer_read()
228 # Ensure no potentially large variables are referenced
229 curl.setopt(pycurl.POSTFIELDS, "")
230 curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
232 if req.completion_cb:
233 req.completion_cb(req)
236 class _NoOpRequestMonitor: # pylint: disable=W0232
237 """No-op request monitor.
241 def acquire(*args, **kwargs):
248 class _PendingRequestMonitor:
251 def __init__(self, owner, pending_fn):
252 """Initializes this class.
256 self._pending_fn = pending_fn
258 # The lock monitor runs in another thread, hence locking is necessary
259 self._lock = locking.SharedLock("PendingHttpRequests")
260 self.acquire = self._lock.acquire
261 self.release = self._lock.release
263 @locking.ssynchronized(_LOCK)
268 self._pending_fn = None
270 @locking.ssynchronized(_LOCK, shared=1)
271 def GetLockInfo(self, requested): # pylint: disable=W0613
272 """Retrieves information about pending requests.
275 @param requested: Requested information, see C{query.LQ_*}
278 # No need to sort here, that's being done by the lock manager and query
279 # library. There are no priorities for requests, hence all show up as
280 # one item under "pending".
284 owner_name = self._owner.getName()
286 for client in self._pending_fn():
287 req = client.GetCurrentRequest()
289 if req.nicename is None:
290 name = "%s%s" % (req.host, req.path)
293 result.append(("rpc/%s" % name, None, [owner_name], None))
298 def _ProcessCurlRequests(multi, requests):
299 """cURL request processor.
301 This generator yields a tuple once for every completed request, successful or
302 not. The first value in the tuple is the handle, the second an error message
303 or C{None} for successful requests.
305 @type multi: C{pycurl.CurlMulti}
306 @param multi: cURL multi object
307 @type requests: sequence
308 @param requests: cURL request handles
311 for curl in requests:
312 multi.add_handle(curl)
315 (ret, active) = multi.perform()
316 assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
318 if ret == pycurl.E_CALL_MULTI_PERFORM:
319 # cURL wants to be called again
323 (remaining_messages, successful, failed) = multi.info_read()
325 for curl in successful:
326 multi.remove_handle(curl)
329 for curl, errnum, errmsg in failed:
330 multi.remove_handle(curl)
331 yield (curl, "Error %s: %s" % (errnum, errmsg))
333 if remaining_messages == 0:
337 # No active handles anymore
340 # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
341 # timeouts, which are only evaluated in multi.perform, aren't
342 # unnecessarily delayed.
346 def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
347 _curl_multi=pycurl.CurlMulti,
348 _curl_process=_ProcessCurlRequests):
349 """Processes any number of HTTP client requests.
351 @type requests: list of L{HttpClientRequest}
352 @param requests: List of all requests
353 @param lock_monitor_cb: Callable for registering with lock monitor
356 assert compat.all((req.error is None and
357 req.success is None and
358 req.resp_status_code is None and
359 req.resp_body is None)
362 # Prepare all requests
364 dict((client.GetCurlHandle(), client)
365 for client in map(lambda req: _StartRequest(_curl(), req), requests))
367 assert len(curl_to_client) == len(requests)
370 monitor = _PendingRequestMonitor(threading.currentThread(),
371 curl_to_client.values)
372 lock_monitor_cb(monitor)
374 monitor = _NoOpRequestMonitor
376 # Process all requests and act based on the returned values
377 for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
378 monitor.acquire(shared=0)
380 curl_to_client.pop(curl).Done(msg)
384 assert not curl_to_client, "Not all requests were processed"
386 # Don't try to read information anymore as all requests have been processed
389 assert compat.all(req.error is not None or
391 req.resp_status_code is not None and
392 req.resp_body is not None)