67456378e85b89973e635a53c154a791d52419bc
[ganeti-local] / lib / http / client.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 from cStringIO import StringIO
28
29 from ganeti import http
30 from ganeti import compat
31 from ganeti import netutils
32
33
34 class HttpClientRequest(object):
35   def __init__(self, host, port, method, path, headers=None, post_data=None,
36                read_timeout=None, curl_config_fn=None):
37     """Describes an HTTP request.
38
39     @type host: string
40     @param host: Hostname
41     @type port: int
42     @param port: Port
43     @type method: string
44     @param method: Method name
45     @type path: string
46     @param path: Request path
47     @type headers: list or None
48     @param headers: Additional headers to send, list of strings
49     @type post_data: string or None
50     @param post_data: Additional data to send
51     @type read_timeout: int
52     @param read_timeout: if passed, it will be used as the read
53         timeout while reading the response from the server
54     @type curl_config_fn: callable
55     @param curl_config_fn: Function to configure cURL object before request
56                            (Note: if the function configures the connection in
57                            a way where it wouldn't be efficient to reuse them,
58                            a "identity" property should be defined, see
59                            L{HttpClientRequest.identity})
60
61     """
62     assert path.startswith("/"), "Path must start with slash (/)"
63     assert curl_config_fn is None or callable(curl_config_fn)
64
65     # Request attributes
66     self.host = host
67     self.port = port
68     self.method = method
69     self.path = path
70     self.read_timeout = read_timeout
71     self.curl_config_fn = curl_config_fn
72
73     if post_data is None:
74       self.post_data = ""
75     else:
76       self.post_data = post_data
77
78     if headers is None:
79       self.headers = []
80     elif isinstance(headers, dict):
81       # Support for old interface
82       self.headers = ["%s: %s" % (name, value)
83                       for name, value in headers.items()]
84     else:
85       self.headers = headers
86
87     # Response status
88     self.success = None
89     self.error = None
90
91     # Response attributes
92     self.resp_status_code = None
93     self.resp_body = None
94
95   def __repr__(self):
96     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97               "%s:%s" % (self.host, self.port),
98               self.method,
99               self.path]
100
101     return "<%s at %#x>" % (" ".join(status), id(self))
102
103   @property
104   def url(self):
105     """Returns the full URL for this requests.
106
107     """
108     if netutils.IPAddress.IsValid(self.host):
109       address = netutils.FormatAddress((self.host, self.port))
110     else:
111       address = "%s:%s" % (self.host, self.port)
112     # TODO: Support for non-SSL requests
113     return "https://%s%s" % (address, self.path)
114
115   @property
116   def identity(self):
117     """Returns identifier for retrieving a pooled connection for this request.
118
119     This allows cURL client objects to be re-used and to cache information
120     (e.g. SSL session IDs or connections).
121
122     """
123     parts = [self.host, self.port]
124
125     if self.curl_config_fn:
126       try:
127         parts.append(self.curl_config_fn.identity)
128       except AttributeError:
129         pass
130
131     return "/".join(str(i) for i in parts)
132
133
134 class _HttpClient(object):
135   def __init__(self, curl_config_fn):
136     """Initializes this class.
137
138     @type curl_config_fn: callable
139     @param curl_config_fn: Function to configure cURL object after
140                            initialization
141
142     """
143     self._req = None
144
145     curl = self._CreateCurlHandle()
146     curl.setopt(pycurl.VERBOSE, False)
147     curl.setopt(pycurl.NOSIGNAL, True)
148     curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149     curl.setopt(pycurl.PROXY, "")
150
151     # Pass cURL object to external config function
152     if curl_config_fn:
153       curl_config_fn(curl)
154
155     self._curl = curl
156
157   @staticmethod
158   def _CreateCurlHandle():
159     """Returns a new cURL object.
160
161     """
162     return pycurl.Curl()
163
164   def GetCurlHandle(self):
165     """Returns the cURL object.
166
167     """
168     return self._curl
169
170   def GetCurrentRequest(self):
171     """Returns the current request.
172
173     @rtype: L{HttpClientRequest} or None
174
175     """
176     return self._req
177
178   def StartRequest(self, req):
179     """Starts a request on this client.
180
181     @type req: L{HttpClientRequest}
182     @param req: HTTP request
183
184     """
185     assert not self._req, "Another request is already started"
186
187     self._req = req
188     self._resp_buffer = StringIO()
189
190     url = req.url
191     method = req.method
192     post_data = req.post_data
193     headers = req.headers
194
195     # PycURL requires strings to be non-unicode
196     assert isinstance(method, str)
197     assert isinstance(url, str)
198     assert isinstance(post_data, str)
199     assert compat.all(isinstance(i, str) for i in headers)
200
201     # Configure cURL object for request
202     curl = self._curl
203     curl.setopt(pycurl.CUSTOMREQUEST, str(method))
204     curl.setopt(pycurl.URL, url)
205     curl.setopt(pycurl.POSTFIELDS, post_data)
206     curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
207     curl.setopt(pycurl.HTTPHEADER, headers)
208
209     if req.read_timeout is None:
210       curl.setopt(pycurl.TIMEOUT, 0)
211     else:
212       curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
213
214     # Pass cURL object to external config function
215     if req.curl_config_fn:
216       req.curl_config_fn(curl)
217
218   def Done(self, errmsg):
219     """Finishes a request.
220
221     @type errmsg: string or None
222     @param errmsg: Error message if request failed
223
224     """
225     req = self._req
226     assert req, "No request"
227
228     logging.debug("Request %s finished, errmsg=%s", req, errmsg)
229
230     curl = self._curl
231
232     req.success = not bool(errmsg)
233     req.error = errmsg
234
235     # Get HTTP response code
236     req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
237     req.resp_body = self._resp_buffer.getvalue()
238
239     # Reset client object
240     self._req = None
241     self._resp_buffer = None
242
243     # Ensure no potentially large variables are referenced
244     curl.setopt(pycurl.POSTFIELDS, "")
245     curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
246
247
248 class _PooledHttpClient:
249   """Data structure for HTTP client pool.
250
251   """
252   def __init__(self, identity, client):
253     """Initializes this class.
254
255     @type identity: string
256     @param identity: Client identifier for pool
257     @type client: L{_HttpClient}
258     @param client: HTTP client
259
260     """
261     self.identity = identity
262     self.client = client
263     self.lastused = 0
264
265   def __repr__(self):
266     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
267               "id=%s" % self.identity,
268               "lastuse=%s" % self.lastused,
269               repr(self.client)]
270
271     return "<%s at %#x>" % (" ".join(status), id(self))
272
273
274 class HttpClientPool:
275   """A simple HTTP client pool.
276
277   Supports one pooled connection per identity (see
278   L{HttpClientRequest.identity}).
279
280   """
281   #: After how many generations to drop unused clients
282   _MAX_GENERATIONS_DROP = 25
283
284   def __init__(self, curl_config_fn):
285     """Initializes this class.
286
287     @type curl_config_fn: callable
288     @param curl_config_fn: Function to configure cURL object after
289                            initialization
290
291     """
292     self._curl_config_fn = curl_config_fn
293     self._generation = 0
294     self._pool = {}
295
296   @staticmethod
297   def _GetHttpClientCreator():
298     """Returns callable to create HTTP client.
299
300     """
301     return _HttpClient
302
303   def _Get(self, identity):
304     """Gets an HTTP client from the pool.
305
306     @type identity: string
307     @param identity: Client identifier
308
309     """
310     try:
311       pclient  = self._pool.pop(identity)
312     except KeyError:
313       # Need to create new client
314       client = self._GetHttpClientCreator()(self._curl_config_fn)
315       pclient = _PooledHttpClient(identity, client)
316       logging.debug("Created new client %s", pclient)
317     else:
318       logging.debug("Reusing client %s", pclient)
319
320     assert pclient.identity == identity
321
322     return pclient
323
324   def _StartRequest(self, req):
325     """Starts a request.
326
327     @type req: L{HttpClientRequest}
328     @param req: HTTP request
329
330     """
331     logging.debug("Starting request %r", req)
332     pclient = self._Get(req.identity)
333
334     assert req.identity not in self._pool
335
336     pclient.client.StartRequest(req)
337     pclient.lastused = self._generation
338
339     return pclient
340
341   def _Return(self, pclients):
342     """Returns HTTP clients to the pool.
343
344     """
345     for pc in pclients:
346       logging.debug("Returning client %s to pool", pc)
347       assert pc.identity not in self._pool
348       assert pc not in self._pool.values()
349       self._pool[pc.identity] = pc
350
351     # Check for unused clients
352     for pc in self._pool.values():
353       if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
354         logging.debug("Removing client %s which hasn't been used"
355                       " for %s generations",
356                       pc, self._MAX_GENERATIONS_DROP)
357         self._pool.pop(pc.identity, None)
358
359     assert compat.all(pc.lastused >= (self._generation -
360                                       self._MAX_GENERATIONS_DROP)
361                       for pc in self._pool.values())
362
363   @staticmethod
364   def _CreateCurlMultiHandle():
365     """Creates new cURL multi handle.
366
367     """
368     return pycurl.CurlMulti()
369
370   def ProcessRequests(self, requests):
371     """Processes any number of HTTP client requests using pooled objects.
372
373     @type requests: list of L{HttpClientRequest}
374     @param requests: List of all requests
375
376     """
377     multi = self._CreateCurlMultiHandle()
378
379     # For client cleanup
380     self._generation += 1
381
382     assert compat.all((req.error is None and
383                        req.success is None and
384                        req.resp_status_code is None and
385                        req.resp_body is None)
386                       for req in requests)
387
388     curl_to_pclient = {}
389     for req in requests:
390       pclient = self._StartRequest(req)
391       curl = pclient.client.GetCurlHandle()
392       curl_to_pclient[curl] = pclient
393       multi.add_handle(curl)
394       assert pclient.client.GetCurrentRequest() == req
395       assert pclient.lastused >= 0
396
397     assert len(curl_to_pclient) == len(requests)
398
399     done_count = 0
400     while True:
401       (ret, _) = multi.perform()
402       assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
403
404       if ret == pycurl.E_CALL_MULTI_PERFORM:
405         # cURL wants to be called again
406         continue
407
408       while True:
409         (remaining_messages, successful, failed) = multi.info_read()
410
411         for curl in successful:
412           multi.remove_handle(curl)
413           done_count += 1
414           pclient = curl_to_pclient[curl]
415           req = pclient.client.GetCurrentRequest()
416           pclient.client.Done(None)
417           assert req.success
418           assert not pclient.client.GetCurrentRequest()
419
420         for curl, errnum, errmsg in failed:
421           multi.remove_handle(curl)
422           done_count += 1
423           pclient = curl_to_pclient[curl]
424           req = pclient.client.GetCurrentRequest()
425           pclient.client.Done("Error %s: %s" % (errnum, errmsg))
426           assert req.error
427           assert not pclient.client.GetCurrentRequest()
428
429         if remaining_messages == 0:
430           break
431
432       assert done_count <= len(requests)
433
434       if done_count == len(requests):
435         break
436
437       # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
438       # timeouts, which are only evaluated in multi.perform, aren't
439       # unnecessarily delayed.
440       multi.select(1.0)
441
442     assert compat.all(pclient.client.GetCurrentRequest() is None
443                       for pclient in curl_to_pclient.values())
444
445     # Return clients to pool
446     self._Return(curl_to_pclient.values())
447
448     assert done_count == len(requests)
449     assert compat.all(req.error is not None or
450                       (req.success and
451                        req.resp_status_code is not None and
452                        req.resp_body is not None)
453                       for req in requests)