Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ e687ec01

History | View | Annotate | Download (12.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31
from ganeti import netutils
32

    
33

    
34
class HttpClientRequest(object):
35
  def __init__(self, host, port, method, path, headers=None, post_data=None,
36
               read_timeout=None, curl_config_fn=None):
37
    """Describes an HTTP request.
38

39
    @type host: string
40
    @param host: Hostname
41
    @type port: int
42
    @param port: Port
43
    @type method: string
44
    @param method: Method name
45
    @type path: string
46
    @param path: Request path
47
    @type headers: list or None
48
    @param headers: Additional headers to send, list of strings
49
    @type post_data: string or None
50
    @param post_data: Additional data to send
51
    @type read_timeout: int
52
    @param read_timeout: if passed, it will be used as the read
53
        timeout while reading the response from the server
54
    @type curl_config_fn: callable
55
    @param curl_config_fn: Function to configure cURL object before request
56
                           (Note: if the function configures the connection in
57
                           a way where it wouldn't be efficient to reuse them,
58
                           a "identity" property should be defined, see
59
                           L{HttpClientRequest.identity})
60

61
    """
62
    assert path.startswith("/"), "Path must start with slash (/)"
63
    assert curl_config_fn is None or callable(curl_config_fn)
64

    
65
    # Request attributes
66
    self.host = host
67
    self.port = port
68
    self.method = method
69
    self.path = path
70
    self.read_timeout = read_timeout
71
    self.curl_config_fn = curl_config_fn
72

    
73
    if post_data is None:
74
      self.post_data = ""
75
    else:
76
      self.post_data = post_data
77

    
78
    if headers is None:
79
      self.headers = []
80
    elif isinstance(headers, dict):
81
      # Support for old interface
82
      self.headers = ["%s: %s" % (name, value)
83
                      for name, value in headers.items()]
84
    else:
85
      self.headers = headers
86

    
87
    # Response status
88
    self.success = None
89
    self.error = None
90

    
91
    # Response attributes
92
    self.resp_status_code = None
93
    self.resp_body = None
94

    
95
  def __repr__(self):
96
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97
              "%s:%s" % (self.host, self.port),
98
              self.method,
99
              self.path]
100

    
101
    return "<%s at %#x>" % (" ".join(status), id(self))
102

    
103
  @property
104
  def url(self):
105
    """Returns the full URL for this requests.
106

107
    """
108
    if netutils.IPAddress.IsValid(self.host):
109
      address = netutils.FormatAddress((self.host, self.port))
110
    else:
111
      address = "%s:%s" % (self.host, self.port)
112
    # TODO: Support for non-SSL requests
113
    return "https://%s%s" % (address, self.path)
114

    
115
  @property
116
  def identity(self):
117
    """Returns identifier for retrieving a pooled connection for this request.
118

119
    This allows cURL client objects to be re-used and to cache information
120
    (e.g. SSL session IDs or connections).
121

122
    """
123
    parts = [self.host, self.port]
124

    
125
    if self.curl_config_fn:
126
      try:
127
        parts.append(self.curl_config_fn.identity)
128
      except AttributeError:
129
        pass
130

    
131
    return "/".join(str(i) for i in parts)
132

    
133

    
134
class _HttpClient(object):
135
  def __init__(self, curl_config_fn):
136
    """Initializes this class.
137

138
    @type curl_config_fn: callable
139
    @param curl_config_fn: Function to configure cURL object after
140
                           initialization
141

142
    """
143
    self._req = None
144

    
145
    curl = self._CreateCurlHandle()
146
    curl.setopt(pycurl.VERBOSE, False)
147
    curl.setopt(pycurl.NOSIGNAL, True)
148
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149
    curl.setopt(pycurl.PROXY, "")
150

    
151
    # Disable SSL session ID caching (pycurl >= 7.16.0)
152
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154

    
155
    # Pass cURL object to external config function
156
    if curl_config_fn:
157
      curl_config_fn(curl)
158

    
159
    self._curl = curl
160

    
161
  @staticmethod
162
  def _CreateCurlHandle():
163
    """Returns a new cURL object.
164

165
    """
166
    return pycurl.Curl()
167

    
168
  def GetCurlHandle(self):
169
    """Returns the cURL object.
170

171
    """
172
    return self._curl
173

    
174
  def GetCurrentRequest(self):
175
    """Returns the current request.
176

177
    @rtype: L{HttpClientRequest} or None
178

179
    """
180
    return self._req
181

    
182
  def StartRequest(self, req):
183
    """Starts a request on this client.
184

185
    @type req: L{HttpClientRequest}
186
    @param req: HTTP request
187

188
    """
189
    assert not self._req, "Another request is already started"
190

    
191
    logging.debug("Starting request %r", req)
192

    
193
    self._req = req
194
    self._resp_buffer = StringIO()
195

    
196
    url = req.url
197
    method = req.method
198
    post_data = req.post_data
199
    headers = req.headers
200

    
201
    # PycURL requires strings to be non-unicode
202
    assert isinstance(method, str)
203
    assert isinstance(url, str)
204
    assert isinstance(post_data, str)
205
    assert compat.all(isinstance(i, str) for i in headers)
206

    
207
    # Configure cURL object for request
208
    curl = self._curl
209
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
210
    curl.setopt(pycurl.URL, url)
211
    curl.setopt(pycurl.POSTFIELDS, post_data)
212
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
213
    curl.setopt(pycurl.HTTPHEADER, headers)
214

    
215
    if req.read_timeout is None:
216
      curl.setopt(pycurl.TIMEOUT, 0)
217
    else:
218
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
219

    
220
    # Pass cURL object to external config function
221
    if req.curl_config_fn:
222
      req.curl_config_fn(curl)
223

    
224
  def Done(self, errmsg):
225
    """Finishes a request.
226

227
    @type errmsg: string or None
228
    @param errmsg: Error message if request failed
229

230
    """
231
    req = self._req
232
    assert req, "No request"
233

    
234
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
235

    
236
    curl = self._curl
237

    
238
    req.success = not bool(errmsg)
239
    req.error = errmsg
240

    
241
    # Get HTTP response code
242
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243
    req.resp_body = self._resp_buffer.getvalue()
244

    
245
    # Reset client object
246
    self._req = None
247
    self._resp_buffer = None
248

    
249
    # Ensure no potentially large variables are referenced
250
    curl.setopt(pycurl.POSTFIELDS, "")
251
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252

    
253

    
254
class _PooledHttpClient:
255
  """Data structure for HTTP client pool.
256

257
  """
258
  def __init__(self, identity, client):
259
    """Initializes this class.
260

261
    @type identity: string
262
    @param identity: Client identifier for pool
263
    @type client: L{_HttpClient}
264
    @param client: HTTP client
265

266
    """
267
    self.identity = identity
268
    self.client = client
269
    self.lastused = 0
270

    
271
  def __repr__(self):
272
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273
              "id=%s" % self.identity,
274
              "lastuse=%s" % self.lastused,
275
              repr(self.client)]
276

    
277
    return "<%s at %#x>" % (" ".join(status), id(self))
278

    
279

    
280
class HttpClientPool:
281
  """A simple HTTP client pool.
282

283
  Supports one pooled connection per identity (see
284
  L{HttpClientRequest.identity}).
285

286
  """
287
  #: After how many generations to drop unused clients
288
  _MAX_GENERATIONS_DROP = 25
289

    
290
  def __init__(self, curl_config_fn):
291
    """Initializes this class.
292

293
    @type curl_config_fn: callable
294
    @param curl_config_fn: Function to configure cURL object after
295
                           initialization
296

297
    """
298
    self._curl_config_fn = curl_config_fn
299
    self._generation = 0
300
    self._pool = {}
301

    
302
    # Create custom logger for HTTP client pool. Change logging level to
303
    # C{logging.NOTSET} to get more details.
304
    self._logger = logging.getLogger(self.__class__.__name__)
305
    self._logger.setLevel(logging.INFO)
306

    
307
  @staticmethod
308
  def _GetHttpClientCreator():
309
    """Returns callable to create HTTP client.
310

311
    """
312
    return _HttpClient
313

    
314
  def _Get(self, identity):
315
    """Gets an HTTP client from the pool.
316

317
    @type identity: string
318
    @param identity: Client identifier
319

320
    """
321
    try:
322
      pclient = self._pool.pop(identity)
323
    except KeyError:
324
      # Need to create new client
325
      client = self._GetHttpClientCreator()(self._curl_config_fn)
326
      pclient = _PooledHttpClient(identity, client)
327
      self._logger.debug("Created new client %s", pclient)
328
    else:
329
      self._logger.debug("Reusing client %s", pclient)
330

    
331
    assert pclient.identity == identity
332

    
333
    return pclient
334

    
335
  def _StartRequest(self, req):
336
    """Starts a request.
337

338
    @type req: L{HttpClientRequest}
339
    @param req: HTTP request
340

341
    """
342
    pclient = self._Get(req.identity)
343

    
344
    assert req.identity not in self._pool
345

    
346
    pclient.client.StartRequest(req)
347
    pclient.lastused = self._generation
348

    
349
    return pclient
350

    
351
  def _Return(self, pclients):
352
    """Returns HTTP clients to the pool.
353

354
    """
355
    for pc in pclients:
356
      self._logger.debug("Returning client %s to pool", pc)
357
      assert pc.identity not in self._pool
358
      assert pc not in self._pool.values()
359
      self._pool[pc.identity] = pc
360

    
361
    # Check for unused clients
362
    for pc in self._pool.values():
363
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
364
        self._logger.debug("Removing client %s which hasn't been used"
365
                           " for %s generations",
366
                           pc, self._MAX_GENERATIONS_DROP)
367
        self._pool.pop(pc.identity, None)
368

    
369
    assert compat.all(pc.lastused >= (self._generation -
370
                                      self._MAX_GENERATIONS_DROP)
371
                      for pc in self._pool.values())
372

    
373
  @staticmethod
374
  def _CreateCurlMultiHandle():
375
    """Creates new cURL multi handle.
376

377
    """
378
    return pycurl.CurlMulti()
379

    
380
  def ProcessRequests(self, requests):
381
    """Processes any number of HTTP client requests using pooled objects.
382

383
    @type requests: list of L{HttpClientRequest}
384
    @param requests: List of all requests
385

386
    """
387
    multi = self._CreateCurlMultiHandle()
388

    
389
    # For client cleanup
390
    self._generation += 1
391

    
392
    assert compat.all((req.error is None and
393
                       req.success is None and
394
                       req.resp_status_code is None and
395
                       req.resp_body is None)
396
                      for req in requests)
397

    
398
    curl_to_pclient = {}
399
    for req in requests:
400
      pclient = self._StartRequest(req)
401
      curl = pclient.client.GetCurlHandle()
402
      curl_to_pclient[curl] = pclient
403
      multi.add_handle(curl)
404
      assert pclient.client.GetCurrentRequest() == req
405
      assert pclient.lastused >= 0
406

    
407
    assert len(curl_to_pclient) == len(requests)
408

    
409
    done_count = 0
410
    while True:
411
      (ret, _) = multi.perform()
412
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
413

    
414
      if ret == pycurl.E_CALL_MULTI_PERFORM:
415
        # cURL wants to be called again
416
        continue
417

    
418
      while True:
419
        (remaining_messages, successful, failed) = multi.info_read()
420

    
421
        for curl in successful:
422
          multi.remove_handle(curl)
423
          done_count += 1
424
          pclient = curl_to_pclient[curl]
425
          req = pclient.client.GetCurrentRequest()
426
          pclient.client.Done(None)
427
          assert req.success
428
          assert not pclient.client.GetCurrentRequest()
429

    
430
        for curl, errnum, errmsg in failed:
431
          multi.remove_handle(curl)
432
          done_count += 1
433
          pclient = curl_to_pclient[curl]
434
          req = pclient.client.GetCurrentRequest()
435
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
436
          assert req.error
437
          assert not pclient.client.GetCurrentRequest()
438

    
439
        if remaining_messages == 0:
440
          break
441

    
442
      assert done_count <= len(requests)
443

    
444
      if done_count == len(requests):
445
        break
446

    
447
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
448
      # timeouts, which are only evaluated in multi.perform, aren't
449
      # unnecessarily delayed.
450
      multi.select(1.0)
451

    
452
    assert compat.all(pclient.client.GetCurrentRequest() is None
453
                      for pclient in curl_to_pclient.values())
454

    
455
    # Return clients to pool
456
    self._Return(curl_to_pclient.values())
457

    
458
    assert done_count == len(requests)
459
    assert compat.all(req.error is not None or
460
                      (req.success and
461
                       req.resp_status_code is not None and
462
                       req.resp_body is not None)
463
                      for req in requests)