Merge branch 'devel-2.7'
[ganeti-local] / lib / http / client.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 import threading
28 from cStringIO import StringIO
29
30 from ganeti import http
31 from ganeti import compat
32 from ganeti import netutils
33 from ganeti import locking
34
35
36 class HttpClientRequest(object):
37   def __init__(self, host, port, method, path, headers=None, post_data=None,
38                read_timeout=None, curl_config_fn=None, nicename=None,
39                completion_cb=None):
40     """Describes an HTTP request.
41
42     @type host: string
43     @param host: Hostname
44     @type port: int
45     @param port: Port
46     @type method: string
47     @param method: Method name
48     @type path: string
49     @param path: Request path
50     @type headers: list or None
51     @param headers: Additional headers to send, list of strings
52     @type post_data: string or None
53     @param post_data: Additional data to send
54     @type read_timeout: int
55     @param read_timeout: if passed, it will be used as the read
56         timeout while reading the response from the server
57     @type curl_config_fn: callable
58     @param curl_config_fn: Function to configure cURL object before request
59     @type nicename: string
60     @param nicename: Name, presentable to a user, to describe this request (no
61                      whitespace)
62     @type completion_cb: callable accepting this request object as a single
63                          parameter
64     @param completion_cb: Callback for request completion
65
66     """
67     assert path.startswith("/"), "Path must start with slash (/)"
68     assert curl_config_fn is None or callable(curl_config_fn)
69     assert completion_cb is None or callable(completion_cb)
70
71     # Request attributes
72     self.host = host
73     self.port = port
74     self.method = method
75     self.path = path
76     self.read_timeout = read_timeout
77     self.curl_config_fn = curl_config_fn
78     self.nicename = nicename
79     self.completion_cb = completion_cb
80
81     if post_data is None:
82       self.post_data = ""
83     else:
84       self.post_data = post_data
85
86     if headers is None:
87       self.headers = []
88     elif isinstance(headers, dict):
89       # Support for old interface
90       self.headers = ["%s: %s" % (name, value)
91                       for name, value in headers.items()]
92     else:
93       self.headers = headers
94
95     # Response status
96     self.success = None
97     self.error = None
98
99     # Response attributes
100     self.resp_status_code = None
101     self.resp_body = None
102
103   def __repr__(self):
104     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
105               "%s:%s" % (self.host, self.port),
106               self.method,
107               self.path]
108
109     return "<%s at %#x>" % (" ".join(status), id(self))
110
111   @property
112   def url(self):
113     """Returns the full URL for this requests.
114
115     """
116     if netutils.IPAddress.IsValid(self.host):
117       address = netutils.FormatAddress((self.host, self.port))
118     else:
119       address = "%s:%s" % (self.host, self.port)
120     # TODO: Support for non-SSL requests
121     return "https://%s%s" % (address, self.path)
122
123
124 def _StartRequest(curl, req):
125   """Starts a request on a cURL object.
126
127   @type curl: pycurl.Curl
128   @param curl: cURL object
129   @type req: L{HttpClientRequest}
130   @param req: HTTP request
131
132   """
133   logging.debug("Starting request %r", req)
134
135   url = req.url
136   method = req.method
137   post_data = req.post_data
138   headers = req.headers
139
140   # PycURL requires strings to be non-unicode
141   assert isinstance(method, str)
142   assert isinstance(url, str)
143   assert isinstance(post_data, str)
144   assert compat.all(isinstance(i, str) for i in headers)
145
146   # Buffer for response
147   resp_buffer = StringIO()
148
149   # Configure client for request
150   curl.setopt(pycurl.VERBOSE, False)
151   curl.setopt(pycurl.NOSIGNAL, True)
152   curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
153   curl.setopt(pycurl.PROXY, "")
154   curl.setopt(pycurl.CUSTOMREQUEST, str(method))
155   curl.setopt(pycurl.URL, url)
156   curl.setopt(pycurl.POSTFIELDS, post_data)
157   curl.setopt(pycurl.HTTPHEADER, headers)
158
159   if req.read_timeout is None:
160     curl.setopt(pycurl.TIMEOUT, 0)
161   else:
162     curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
163
164   # Disable SSL session ID caching (pycurl >= 7.16.0)
165   if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
166     curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
167
168   curl.setopt(pycurl.WRITEFUNCTION, resp_buffer.write)
169
170   # Pass cURL object to external config function
171   if req.curl_config_fn:
172     req.curl_config_fn(curl)
173
174   return _PendingRequest(curl, req, resp_buffer.getvalue)
175
176
177 class _PendingRequest:
178   def __init__(self, curl, req, resp_buffer_read):
179     """Initializes this class.
180
181     @type curl: pycurl.Curl
182     @param curl: cURL object
183     @type req: L{HttpClientRequest}
184     @param req: HTTP request
185     @type resp_buffer_read: callable
186     @param resp_buffer_read: Function to read response body
187
188     """
189     assert req.success is None
190
191     self._curl = curl
192     self._req = req
193     self._resp_buffer_read = resp_buffer_read
194
195   def GetCurlHandle(self):
196     """Returns the cURL object.
197
198     """
199     return self._curl
200
201   def GetCurrentRequest(self):
202     """Returns the current request.
203
204     """
205     return self._req
206
207   def Done(self, errmsg):
208     """Finishes a request.
209
210     @type errmsg: string or None
211     @param errmsg: Error message if request failed
212
213     """
214     curl = self._curl
215     req = self._req
216
217     assert req.success is None, "Request has already been finalized"
218
219     logging.debug("Request %s finished, errmsg=%s", req, errmsg)
220
221     req.success = not bool(errmsg)
222     req.error = errmsg
223
224     # Get HTTP response code
225     req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
226     req.resp_body = self._resp_buffer_read()
227
228     # Ensure no potentially large variables are referenced
229     curl.setopt(pycurl.POSTFIELDS, "")
230     curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
231
232     if req.completion_cb:
233       req.completion_cb(req)
234
235
236 class _NoOpRequestMonitor: # pylint: disable=W0232
237   """No-op request monitor.
238
239   """
240   @staticmethod
241   def acquire(*args, **kwargs):
242     pass
243
244   release = acquire
245   Disable = acquire
246
247
248 class _PendingRequestMonitor:
249   _LOCK = "_lock"
250
251   def __init__(self, owner, pending_fn):
252     """Initializes this class.
253
254     """
255     self._owner = owner
256     self._pending_fn = pending_fn
257
258     # The lock monitor runs in another thread, hence locking is necessary
259     self._lock = locking.SharedLock("PendingHttpRequests")
260     self.acquire = self._lock.acquire
261     self.release = self._lock.release
262
263   @locking.ssynchronized(_LOCK)
264   def Disable(self):
265     """Disable monitor.
266
267     """
268     self._pending_fn = None
269
270   @locking.ssynchronized(_LOCK, shared=1)
271   def GetLockInfo(self, requested): # pylint: disable=W0613
272     """Retrieves information about pending requests.
273
274     @type requested: set
275     @param requested: Requested information, see C{query.LQ_*}
276
277     """
278     # No need to sort here, that's being done by the lock manager and query
279     # library. There are no priorities for requests, hence all show up as
280     # one item under "pending".
281     result = []
282
283     if self._pending_fn:
284       owner_name = self._owner.getName()
285
286       for client in self._pending_fn():
287         req = client.GetCurrentRequest()
288         if req:
289           if req.nicename is None:
290             name = "%s%s" % (req.host, req.path)
291           else:
292             name = req.nicename
293           result.append(("rpc/%s" % name, None, [owner_name], None))
294
295     return result
296
297
298 def _ProcessCurlRequests(multi, requests):
299   """cURL request processor.
300
301   This generator yields a tuple once for every completed request, successful or
302   not. The first value in the tuple is the handle, the second an error message
303   or C{None} for successful requests.
304
305   @type multi: C{pycurl.CurlMulti}
306   @param multi: cURL multi object
307   @type requests: sequence
308   @param requests: cURL request handles
309
310   """
311   for curl in requests:
312     multi.add_handle(curl)
313
314   while True:
315     (ret, active) = multi.perform()
316     assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
317
318     if ret == pycurl.E_CALL_MULTI_PERFORM:
319       # cURL wants to be called again
320       continue
321
322     while True:
323       (remaining_messages, successful, failed) = multi.info_read()
324
325       for curl in successful:
326         multi.remove_handle(curl)
327         yield (curl, None)
328
329       for curl, errnum, errmsg in failed:
330         multi.remove_handle(curl)
331         yield (curl, "Error %s: %s" % (errnum, errmsg))
332
333       if remaining_messages == 0:
334         break
335
336     if active == 0:
337       # No active handles anymore
338       break
339
340     # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
341     # timeouts, which are only evaluated in multi.perform, aren't
342     # unnecessarily delayed.
343     multi.select(1.0)
344
345
346 def ProcessRequests(requests, lock_monitor_cb=None, _curl=pycurl.Curl,
347                     _curl_multi=pycurl.CurlMulti,
348                     _curl_process=_ProcessCurlRequests):
349   """Processes any number of HTTP client requests.
350
351   @type requests: list of L{HttpClientRequest}
352   @param requests: List of all requests
353   @param lock_monitor_cb: Callable for registering with lock monitor
354
355   """
356   assert compat.all((req.error is None and
357                      req.success is None and
358                      req.resp_status_code is None and
359                      req.resp_body is None)
360                     for req in requests)
361
362   # Prepare all requests
363   curl_to_client = \
364     dict((client.GetCurlHandle(), client)
365          for client in map(lambda req: _StartRequest(_curl(), req), requests))
366
367   assert len(curl_to_client) == len(requests)
368
369   if lock_monitor_cb:
370     monitor = _PendingRequestMonitor(threading.currentThread(),
371                                      curl_to_client.values)
372     lock_monitor_cb(monitor)
373   else:
374     monitor = _NoOpRequestMonitor
375
376   # Process all requests and act based on the returned values
377   for (curl, msg) in _curl_process(_curl_multi(), curl_to_client.keys()):
378     monitor.acquire(shared=0)
379     try:
380       curl_to_client.pop(curl).Done(msg)
381     finally:
382       monitor.release()
383
384   assert not curl_to_client, "Not all requests were processed"
385
386   # Don't try to read information anymore as all requests have been processed
387   monitor.Disable()
388
389   assert compat.all(req.error is not None or
390                     (req.success and
391                      req.resp_status_code is not None and
392                      req.resp_body is not None)
393                     for req in requests)