Convert RPC client to PycURL
[ganeti-local] / lib / http / client.py
1 #
2 #
3
4 # Copyright (C) 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """HTTP client module.
22
23 """
24
25 import logging
26 import pycurl
27 from cStringIO import StringIO
28
29 from ganeti import http
30 from ganeti import compat
31
32
33 class HttpClientRequest(object):
34   def __init__(self, host, port, method, path, headers=None, post_data=None,
35                read_timeout=None, curl_config_fn=None):
36     """Describes an HTTP request.
37
38     @type host: string
39     @param host: Hostname
40     @type port: int
41     @param port: Port
42     @type method: string
43     @param method: Method name
44     @type path: string
45     @param path: Request path
46     @type headers: list or None
47     @param headers: Additional headers to send, list of strings
48     @type post_data: string or None
49     @param post_data: Additional data to send
50     @type read_timeout: int
51     @param read_timeout: if passed, it will be used as the read
52         timeout while reading the response from the server
53     @type curl_config_fn: callable
54     @param curl_config_fn: Function to configure cURL object before request
55                            (Note: if the function configures the connection in
56                            a way where it wouldn't be efficient to reuse them,
57                            a "identity" property should be defined, see
58                            L{HttpClientRequest.identity})
59
60     """
61     assert path.startswith("/"), "Path must start with slash (/)"
62     assert curl_config_fn is None or callable(curl_config_fn)
63
64     # Request attributes
65     self.host = host
66     self.port = port
67     self.method = method
68     self.path = path
69     self.read_timeout = read_timeout
70     self.curl_config_fn = curl_config_fn
71
72     if post_data is None:
73       self.post_data = ""
74     else:
75       self.post_data = post_data
76
77     if headers is None:
78       self.headers = []
79     elif isinstance(headers, dict):
80       # Support for old interface
81       self.headers = ["%s: %s" % (name, value)
82                       for name, value in headers.items()]
83     else:
84       self.headers = headers
85
86     # Response status
87     self.success = None
88     self.error = None
89
90     # Response attributes
91     self.resp_status_code = None
92     self.resp_body = None
93
94   def __repr__(self):
95     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
96               "%s:%s" % (self.host, self.port),
97               self.method,
98               self.path]
99
100     return "<%s at %#x>" % (" ".join(status), id(self))
101
102   @property
103   def url(self):
104     """Returns the full URL for this requests.
105
106     """
107     # TODO: Support for non-SSL requests
108     return "https://%s:%s%s" % (self.host, self.port, self.path)
109
110   @property
111   def identity(self):
112     """Returns identifier for retrieving a pooled connection for this request.
113
114     This allows cURL client objects to be re-used and to cache information
115     (e.g. SSL session IDs or connections).
116
117     """
118     parts = [self.host, self.port]
119
120     if self.curl_config_fn:
121       try:
122         parts.append(self.curl_config_fn.identity)
123       except AttributeError:
124         pass
125
126     return "/".join(str(i) for i in parts)
127
128
129 class _HttpClient(object):
130   def __init__(self, curl_config_fn):
131     """Initializes this class.
132
133     @type curl_config_fn: callable
134     @param curl_config_fn: Function to configure cURL object after
135                            initialization
136
137     """
138     self._req = None
139
140     curl = self._CreateCurlHandle()
141     curl.setopt(pycurl.VERBOSE, False)
142     curl.setopt(pycurl.NOSIGNAL, True)
143     curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
144     curl.setopt(pycurl.PROXY, "")
145
146     # Pass cURL object to external config function
147     if curl_config_fn:
148       curl_config_fn(curl)
149
150     self._curl = curl
151
152   @staticmethod
153   def _CreateCurlHandle():
154     """Returns a new cURL object.
155
156     """
157     return pycurl.Curl()
158
159   def GetCurlHandle(self):
160     """Returns the cURL object.
161
162     """
163     return self._curl
164
165   def GetCurrentRequest(self):
166     """Returns the current request.
167
168     @rtype: L{HttpClientRequest} or None
169
170     """
171     return self._req
172
173   def StartRequest(self, req):
174     """Starts a request on this client.
175
176     @type req: L{HttpClientRequest}
177     @param req: HTTP request
178
179     """
180     assert not self._req, "Another request is already started"
181
182     self._req = req
183     self._resp_buffer = StringIO()
184
185     url = req.url
186     method = req.method
187     post_data = req.post_data
188     headers = req.headers
189
190     # PycURL requires strings to be non-unicode
191     assert isinstance(method, str)
192     assert isinstance(url, str)
193     assert isinstance(post_data, str)
194     assert compat.all(isinstance(i, str) for i in headers)
195
196     # Configure cURL object for request
197     curl = self._curl
198     curl.setopt(pycurl.CUSTOMREQUEST, str(method))
199     curl.setopt(pycurl.URL, url)
200     curl.setopt(pycurl.POSTFIELDS, post_data)
201     curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
202     curl.setopt(pycurl.HTTPHEADER, headers)
203
204     if req.read_timeout is None:
205       curl.setopt(pycurl.TIMEOUT, 0)
206     else:
207       curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
208
209     # Pass cURL object to external config function
210     if req.curl_config_fn:
211       req.curl_config_fn(curl)
212
213   def Done(self, errmsg):
214     """Finishes a request.
215
216     @type errmsg: string or None
217     @param errmsg: Error message if request failed
218
219     """
220     req = self._req
221     assert req, "No request"
222
223     logging.debug("Request %s finished, errmsg=%s", req, errmsg)
224
225     curl = self._curl
226
227     req.success = not bool(errmsg)
228     req.error = errmsg
229
230     # Get HTTP response code
231     req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
232     req.resp_body = self._resp_buffer.getvalue()
233
234     # Reset client object
235     self._req = None
236     self._resp_buffer = None
237
238     # Ensure no potentially large variables are referenced
239     curl.setopt(pycurl.POSTFIELDS, "")
240     curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
241
242
243 class _PooledHttpClient:
244   """Data structure for HTTP client pool.
245
246   """
247   def __init__(self, identity, client):
248     """Initializes this class.
249
250     @type identity: string
251     @param identity: Client identifier for pool
252     @type client: L{_HttpClient}
253     @param client: HTTP client
254
255     """
256     self.identity = identity
257     self.client = client
258     self.lastused = 0
259
260   def __repr__(self):
261     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262               "id=%s" % self.identity,
263               "lastuse=%s" % self.lastused,
264               repr(self.client)]
265
266     return "<%s at %#x>" % (" ".join(status), id(self))
267
268
269 class HttpClientPool:
270   """A simple HTTP client pool.
271
272   Supports one pooled connection per identity (see
273   L{HttpClientRequest.identity}).
274
275   """
276   #: After how many generations to drop unused clients
277   _MAX_GENERATIONS_DROP = 25
278
279   def __init__(self, curl_config_fn):
280     """Initializes this class.
281
282     @type curl_config_fn: callable
283     @param curl_config_fn: Function to configure cURL object after
284                            initialization
285
286     """
287     self._curl_config_fn = curl_config_fn
288     self._generation = 0
289     self._pool = {}
290
291   @staticmethod
292   def _GetHttpClientCreator():
293     """Returns callable to create HTTP client.
294
295     """
296     return _HttpClient
297
298   def _Get(self, identity):
299     """Gets an HTTP client from the pool.
300
301     @type identity: string
302     @param identity: Client identifier
303
304     """
305     try:
306       pclient  = self._pool.pop(identity)
307     except KeyError:
308       # Need to create new client
309       client = self._GetHttpClientCreator()(self._curl_config_fn)
310       pclient = _PooledHttpClient(identity, client)
311       logging.debug("Created new client %s", pclient)
312     else:
313       logging.debug("Reusing client %s", pclient)
314
315     assert pclient.identity == identity
316
317     return pclient
318
319   def _StartRequest(self, req):
320     """Starts a request.
321
322     @type req: L{HttpClientRequest}
323     @param req: HTTP request
324
325     """
326     logging.debug("Starting request %r", req)
327     pclient = self._Get(req.identity)
328
329     assert req.identity not in self._pool
330
331     pclient.client.StartRequest(req)
332     pclient.lastused = self._generation
333
334     return pclient
335
336   def _Return(self, pclients):
337     """Returns HTTP clients to the pool.
338
339     """
340     for pc in pclients:
341       logging.debug("Returning client %s to pool", pc)
342       assert pc.identity not in self._pool
343       assert pc not in self._pool.values()
344       self._pool[pc.identity] = pc
345
346     # Check for unused clients
347     for pc in self._pool.values():
348       if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
349         logging.debug("Removing client %s which hasn't been used"
350                       " for %s generations",
351                       pc, self._MAX_GENERATIONS_DROP)
352         self._pool.pop(pc.identity, None)
353
354     assert compat.all(pc.lastused >= (self._generation -
355                                       self._MAX_GENERATIONS_DROP)
356                       for pc in self._pool.values())
357
358   @staticmethod
359   def _CreateCurlMultiHandle():
360     """Creates new cURL multi handle.
361
362     """
363     return pycurl.CurlMulti()
364
365   def ProcessRequests(self, requests):
366     """Processes any number of HTTP client requests using pooled objects.
367
368     @type requests: list of L{HttpClientRequest}
369     @param requests: List of all requests
370
371     """
372     multi = self._CreateCurlMultiHandle()
373
374     # For client cleanup
375     self._generation += 1
376
377     assert compat.all((req.error is None and
378                        req.success is None and
379                        req.resp_status_code is None and
380                        req.resp_body is None)
381                       for req in requests)
382
383     curl_to_pclient = {}
384     for req in requests:
385       pclient = self._StartRequest(req)
386       curl = pclient.client.GetCurlHandle()
387       curl_to_pclient[curl] = pclient
388       multi.add_handle(curl)
389       assert pclient.client.GetCurrentRequest() == req
390       assert pclient.lastused >= 0
391
392     assert len(curl_to_pclient) == len(requests)
393
394     done_count = 0
395     while True:
396       (ret, _) = multi.perform()
397       assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
398
399       if ret == pycurl.E_CALL_MULTI_PERFORM:
400         # cURL wants to be called again
401         continue
402
403       while True:
404         (remaining_messages, successful, failed) = multi.info_read()
405
406         for curl in successful:
407           multi.remove_handle(curl)
408           done_count += 1
409           pclient = curl_to_pclient[curl]
410           req = pclient.client.GetCurrentRequest()
411           pclient.client.Done(None)
412           assert req.success
413           assert not pclient.client.GetCurrentRequest()
414
415         for curl, errnum, errmsg in failed:
416           multi.remove_handle(curl)
417           done_count += 1
418           pclient = curl_to_pclient[curl]
419           req = pclient.client.GetCurrentRequest()
420           pclient.client.Done("Error %s: %s" % (errnum, errmsg))
421           assert req.error
422           assert not pclient.client.GetCurrentRequest()
423
424         if remaining_messages == 0:
425           break
426
427       assert done_count <= len(requests)
428
429       if done_count == len(requests):
430         break
431
432       # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
433       # timeouts, which are only evaluated in multi.perform, aren't
434       # unnecessarily delayed.
435       multi.select(1.0)
436
437     assert compat.all(pclient.client.GetCurrentRequest() is None
438                       for pclient in curl_to_pclient.values())
439
440     # Return clients to pool
441     self._Return(curl_to_pclient.values())
442
443     assert done_count == len(requests)
444     assert compat.all(req.error is not None or
445                       (req.success and
446                        req.resp_status_code is not None and
447                        req.resp_body is not None)
448                       for req in requests)