Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 4ba4fe14

History | View | Annotate | Download (12.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31

    
32

    
33
class HttpClientRequest(object):
34
  def __init__(self, host, port, method, path, headers=None, post_data=None,
35
               read_timeout=None, curl_config_fn=None):
36
    """Describes an HTTP request.
37

38
    @type host: string
39
    @param host: Hostname
40
    @type port: int
41
    @param port: Port
42
    @type method: string
43
    @param method: Method name
44
    @type path: string
45
    @param path: Request path
46
    @type headers: list or None
47
    @param headers: Additional headers to send, list of strings
48
    @type post_data: string or None
49
    @param post_data: Additional data to send
50
    @type read_timeout: int
51
    @param read_timeout: if passed, it will be used as the read
52
        timeout while reading the response from the server
53
    @type curl_config_fn: callable
54
    @param curl_config_fn: Function to configure cURL object before request
55
                           (Note: if the function configures the connection in
56
                           a way where it wouldn't be efficient to reuse them,
57
                           a "identity" property should be defined, see
58
                           L{HttpClientRequest.identity})
59

60
    """
61
    assert path.startswith("/"), "Path must start with slash (/)"
62
    assert curl_config_fn is None or callable(curl_config_fn)
63

    
64
    # Request attributes
65
    self.host = host
66
    self.port = port
67
    self.method = method
68
    self.path = path
69
    self.read_timeout = read_timeout
70
    self.curl_config_fn = curl_config_fn
71

    
72
    if post_data is None:
73
      self.post_data = ""
74
    else:
75
      self.post_data = post_data
76

    
77
    if headers is None:
78
      self.headers = []
79
    elif isinstance(headers, dict):
80
      # Support for old interface
81
      self.headers = ["%s: %s" % (name, value)
82
                      for name, value in headers.items()]
83
    else:
84
      self.headers = headers
85

    
86
    # Response status
87
    self.success = None
88
    self.error = None
89

    
90
    # Response attributes
91
    self.resp_status_code = None
92
    self.resp_body = None
93

    
94
  def __repr__(self):
95
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
96
              "%s:%s" % (self.host, self.port),
97
              self.method,
98
              self.path]
99

    
100
    return "<%s at %#x>" % (" ".join(status), id(self))
101

    
102
  @property
103
  def url(self):
104
    """Returns the full URL for this requests.
105

106
    """
107
    # TODO: Support for non-SSL requests
108
    return "https://%s:%s%s" % (self.host, self.port, self.path)
109

    
110
  @property
111
  def identity(self):
112
    """Returns identifier for retrieving a pooled connection for this request.
113

114
    This allows cURL client objects to be re-used and to cache information
115
    (e.g. SSL session IDs or connections).
116

117
    """
118
    parts = [self.host, self.port]
119

    
120
    if self.curl_config_fn:
121
      try:
122
        parts.append(self.curl_config_fn.identity)
123
      except AttributeError:
124
        pass
125

    
126
    return "/".join(str(i) for i in parts)
127

    
128

    
129
class _HttpClient(object):
130
  def __init__(self, curl_config_fn):
131
    """Initializes this class.
132

133
    @type curl_config_fn: callable
134
    @param curl_config_fn: Function to configure cURL object after
135
                           initialization
136

137
    """
138
    self._req = None
139

    
140
    curl = self._CreateCurlHandle()
141
    curl.setopt(pycurl.VERBOSE, False)
142
    curl.setopt(pycurl.NOSIGNAL, True)
143
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
144
    curl.setopt(pycurl.PROXY, "")
145

    
146
    # Disable SSL session ID caching (pycurl >= 7.16.0)
147
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
148
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
149

    
150
    # Pass cURL object to external config function
151
    if curl_config_fn:
152
      curl_config_fn(curl)
153

    
154
    self._curl = curl
155

    
156
  @staticmethod
157
  def _CreateCurlHandle():
158
    """Returns a new cURL object.
159

160
    """
161
    return pycurl.Curl()
162

    
163
  def GetCurlHandle(self):
164
    """Returns the cURL object.
165

166
    """
167
    return self._curl
168

    
169
  def GetCurrentRequest(self):
170
    """Returns the current request.
171

172
    @rtype: L{HttpClientRequest} or None
173

174
    """
175
    return self._req
176

    
177
  def StartRequest(self, req):
178
    """Starts a request on this client.
179

180
    @type req: L{HttpClientRequest}
181
    @param req: HTTP request
182

183
    """
184
    assert not self._req, "Another request is already started"
185

    
186
    self._req = req
187
    self._resp_buffer = StringIO()
188

    
189
    url = req.url
190
    method = req.method
191
    post_data = req.post_data
192
    headers = req.headers
193

    
194
    # PycURL requires strings to be non-unicode
195
    assert isinstance(method, str)
196
    assert isinstance(url, str)
197
    assert isinstance(post_data, str)
198
    assert compat.all(isinstance(i, str) for i in headers)
199

    
200
    # Configure cURL object for request
201
    curl = self._curl
202
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
203
    curl.setopt(pycurl.URL, url)
204
    curl.setopt(pycurl.POSTFIELDS, post_data)
205
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
206
    curl.setopt(pycurl.HTTPHEADER, headers)
207

    
208
    if req.read_timeout is None:
209
      curl.setopt(pycurl.TIMEOUT, 0)
210
    else:
211
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
212

    
213
    # Pass cURL object to external config function
214
    if req.curl_config_fn:
215
      req.curl_config_fn(curl)
216

    
217
  def Done(self, errmsg):
218
    """Finishes a request.
219

220
    @type errmsg: string or None
221
    @param errmsg: Error message if request failed
222

223
    """
224
    req = self._req
225
    assert req, "No request"
226

    
227
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
228

    
229
    curl = self._curl
230

    
231
    req.success = not bool(errmsg)
232
    req.error = errmsg
233

    
234
    # Get HTTP response code
235
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
236
    req.resp_body = self._resp_buffer.getvalue()
237

    
238
    # Reset client object
239
    self._req = None
240
    self._resp_buffer = None
241

    
242
    # Ensure no potentially large variables are referenced
243
    curl.setopt(pycurl.POSTFIELDS, "")
244
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
245

    
246

    
247
class _PooledHttpClient:
248
  """Data structure for HTTP client pool.
249

250
  """
251
  def __init__(self, identity, client):
252
    """Initializes this class.
253

254
    @type identity: string
255
    @param identity: Client identifier for pool
256
    @type client: L{_HttpClient}
257
    @param client: HTTP client
258

259
    """
260
    self.identity = identity
261
    self.client = client
262
    self.lastused = 0
263

    
264
  def __repr__(self):
265
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
266
              "id=%s" % self.identity,
267
              "lastuse=%s" % self.lastused,
268
              repr(self.client)]
269

    
270
    return "<%s at %#x>" % (" ".join(status), id(self))
271

    
272

    
273
class HttpClientPool:
274
  """A simple HTTP client pool.
275

276
  Supports one pooled connection per identity (see
277
  L{HttpClientRequest.identity}).
278

279
  """
280
  #: After how many generations to drop unused clients
281
  _MAX_GENERATIONS_DROP = 25
282

    
283
  def __init__(self, curl_config_fn):
284
    """Initializes this class.
285

286
    @type curl_config_fn: callable
287
    @param curl_config_fn: Function to configure cURL object after
288
                           initialization
289

290
    """
291
    self._curl_config_fn = curl_config_fn
292
    self._generation = 0
293
    self._pool = {}
294

    
295
  @staticmethod
296
  def _GetHttpClientCreator():
297
    """Returns callable to create HTTP client.
298

299
    """
300
    return _HttpClient
301

    
302
  def _Get(self, identity):
303
    """Gets an HTTP client from the pool.
304

305
    @type identity: string
306
    @param identity: Client identifier
307

308
    """
309
    try:
310
      pclient  = self._pool.pop(identity)
311
    except KeyError:
312
      # Need to create new client
313
      client = self._GetHttpClientCreator()(self._curl_config_fn)
314
      pclient = _PooledHttpClient(identity, client)
315
      logging.debug("Created new client %s", pclient)
316
    else:
317
      logging.debug("Reusing client %s", pclient)
318

    
319
    assert pclient.identity == identity
320

    
321
    return pclient
322

    
323
  def _StartRequest(self, req):
324
    """Starts a request.
325

326
    @type req: L{HttpClientRequest}
327
    @param req: HTTP request
328

329
    """
330
    logging.debug("Starting request %r", req)
331
    pclient = self._Get(req.identity)
332

    
333
    assert req.identity not in self._pool
334

    
335
    pclient.client.StartRequest(req)
336
    pclient.lastused = self._generation
337

    
338
    return pclient
339

    
340
  def _Return(self, pclients):
341
    """Returns HTTP clients to the pool.
342

343
    """
344
    for pc in pclients:
345
      logging.debug("Returning client %s to pool", pc)
346
      assert pc.identity not in self._pool
347
      assert pc not in self._pool.values()
348
      self._pool[pc.identity] = pc
349

    
350
    # Check for unused clients
351
    for pc in self._pool.values():
352
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
353
        logging.debug("Removing client %s which hasn't been used"
354
                      " for %s generations",
355
                      pc, self._MAX_GENERATIONS_DROP)
356
        self._pool.pop(pc.identity, None)
357

    
358
    assert compat.all(pc.lastused >= (self._generation -
359
                                      self._MAX_GENERATIONS_DROP)
360
                      for pc in self._pool.values())
361

    
362
  @staticmethod
363
  def _CreateCurlMultiHandle():
364
    """Creates new cURL multi handle.
365

366
    """
367
    return pycurl.CurlMulti()
368

    
369
  def ProcessRequests(self, requests):
370
    """Processes any number of HTTP client requests using pooled objects.
371

372
    @type requests: list of L{HttpClientRequest}
373
    @param requests: List of all requests
374

375
    """
376
    multi = self._CreateCurlMultiHandle()
377

    
378
    # For client cleanup
379
    self._generation += 1
380

    
381
    assert compat.all((req.error is None and
382
                       req.success is None and
383
                       req.resp_status_code is None and
384
                       req.resp_body is None)
385
                      for req in requests)
386

    
387
    curl_to_pclient = {}
388
    for req in requests:
389
      pclient = self._StartRequest(req)
390
      curl = pclient.client.GetCurlHandle()
391
      curl_to_pclient[curl] = pclient
392
      multi.add_handle(curl)
393
      assert pclient.client.GetCurrentRequest() == req
394
      assert pclient.lastused >= 0
395

    
396
    assert len(curl_to_pclient) == len(requests)
397

    
398
    done_count = 0
399
    while True:
400
      (ret, _) = multi.perform()
401
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
402

    
403
      if ret == pycurl.E_CALL_MULTI_PERFORM:
404
        # cURL wants to be called again
405
        continue
406

    
407
      while True:
408
        (remaining_messages, successful, failed) = multi.info_read()
409

    
410
        for curl in successful:
411
          multi.remove_handle(curl)
412
          done_count += 1
413
          pclient = curl_to_pclient[curl]
414
          req = pclient.client.GetCurrentRequest()
415
          pclient.client.Done(None)
416
          assert req.success
417
          assert not pclient.client.GetCurrentRequest()
418

    
419
        for curl, errnum, errmsg in failed:
420
          multi.remove_handle(curl)
421
          done_count += 1
422
          pclient = curl_to_pclient[curl]
423
          req = pclient.client.GetCurrentRequest()
424
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
425
          assert req.error
426
          assert not pclient.client.GetCurrentRequest()
427

    
428
        if remaining_messages == 0:
429
          break
430

    
431
      assert done_count <= len(requests)
432

    
433
      if done_count == len(requests):
434
        break
435

    
436
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
437
      # timeouts, which are only evaluated in multi.perform, aren't
438
      # unnecessarily delayed.
439
      multi.select(1.0)
440

    
441
    assert compat.all(pclient.client.GetCurrentRequest() is None
442
                      for pclient in curl_to_pclient.values())
443

    
444
    # Return clients to pool
445
    self._Return(curl_to_pclient.values())
446

    
447
    assert done_count == len(requests)
448
    assert compat.all(req.error is not None or
449
                      (req.success and
450
                       req.resp_status_code is not None and
451
                       req.resp_body is not None)
452
                      for req in requests)