Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ a3c10d31

History | View | Annotate | Download (12.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31
from ganeti import netutils
32

    
33

    
34
class HttpClientRequest(object):
35
  def __init__(self, host, port, method, path, headers=None, post_data=None,
36
               read_timeout=None, curl_config_fn=None):
37
    """Describes an HTTP request.
38

39
    @type host: string
40
    @param host: Hostname
41
    @type port: int
42
    @param port: Port
43
    @type method: string
44
    @param method: Method name
45
    @type path: string
46
    @param path: Request path
47
    @type headers: list or None
48
    @param headers: Additional headers to send, list of strings
49
    @type post_data: string or None
50
    @param post_data: Additional data to send
51
    @type read_timeout: int
52
    @param read_timeout: if passed, it will be used as the read
53
        timeout while reading the response from the server
54
    @type curl_config_fn: callable
55
    @param curl_config_fn: Function to configure cURL object before request
56
                           (Note: if the function configures the connection in
57
                           a way where it wouldn't be efficient to reuse them,
58
                           a "identity" property should be defined, see
59
                           L{HttpClientRequest.identity})
60

61
    """
62
    assert path.startswith("/"), "Path must start with slash (/)"
63
    assert curl_config_fn is None or callable(curl_config_fn)
64

    
65
    # Request attributes
66
    self.host = host
67
    self.port = port
68
    self.method = method
69
    self.path = path
70
    self.read_timeout = read_timeout
71
    self.curl_config_fn = curl_config_fn
72

    
73
    if post_data is None:
74
      self.post_data = ""
75
    else:
76
      self.post_data = post_data
77

    
78
    if headers is None:
79
      self.headers = []
80
    elif isinstance(headers, dict):
81
      # Support for old interface
82
      self.headers = ["%s: %s" % (name, value)
83
                      for name, value in headers.items()]
84
    else:
85
      self.headers = headers
86

    
87
    # Response status
88
    self.success = None
89
    self.error = None
90

    
91
    # Response attributes
92
    self.resp_status_code = None
93
    self.resp_body = None
94

    
95
  def __repr__(self):
96
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97
              "%s:%s" % (self.host, self.port),
98
              self.method,
99
              self.path]
100

    
101
    return "<%s at %#x>" % (" ".join(status), id(self))
102

    
103
  @property
104
  def url(self):
105
    """Returns the full URL for this requests.
106

107
    """
108
    if netutils.IPAddress.IsValid(self.host):
109
      address = netutils.FormatAddress((self.host, self.port))
110
    else:
111
      address = "%s:%s" % (self.host, self.port)
112
    # TODO: Support for non-SSL requests
113
    return "https://%s%s" % (address, self.path)
114

    
115
  @property
116
  def identity(self):
117
    """Returns identifier for retrieving a pooled connection for this request.
118

119
    This allows cURL client objects to be re-used and to cache information
120
    (e.g. SSL session IDs or connections).
121

122
    """
123
    parts = [self.host, self.port]
124

    
125
    if self.curl_config_fn:
126
      try:
127
        parts.append(self.curl_config_fn.identity)
128
      except AttributeError:
129
        pass
130

    
131
    return "/".join(str(i) for i in parts)
132

    
133

    
134
class _HttpClient(object):
135
  def __init__(self, curl_config_fn):
136
    """Initializes this class.
137

138
    @type curl_config_fn: callable
139
    @param curl_config_fn: Function to configure cURL object after
140
                           initialization
141

142
    """
143
    self._req = None
144

    
145
    curl = self._CreateCurlHandle()
146
    curl.setopt(pycurl.VERBOSE, False)
147
    curl.setopt(pycurl.NOSIGNAL, True)
148
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149
    curl.setopt(pycurl.PROXY, "")
150

    
151
    # Disable SSL session ID caching (pycurl >= 7.16.0)
152
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154

    
155
    # Pass cURL object to external config function
156
    if curl_config_fn:
157
      curl_config_fn(curl)
158

    
159
    self._curl = curl
160

    
161
  @staticmethod
162
  def _CreateCurlHandle():
163
    """Returns a new cURL object.
164

165
    """
166
    return pycurl.Curl()
167

    
168
  def GetCurlHandle(self):
169
    """Returns the cURL object.
170

171
    """
172
    return self._curl
173

    
174
  def GetCurrentRequest(self):
175
    """Returns the current request.
176

177
    @rtype: L{HttpClientRequest} or None
178

179
    """
180
    return self._req
181

    
182
  def StartRequest(self, req):
183
    """Starts a request on this client.
184

185
    @type req: L{HttpClientRequest}
186
    @param req: HTTP request
187

188
    """
189
    assert not self._req, "Another request is already started"
190

    
191
    logging.debug("Starting request %r", req)
192

    
193
    self._req = req
194
    self._resp_buffer = StringIO()
195

    
196
    url = req.url
197
    method = req.method
198
    post_data = req.post_data
199
    headers = req.headers
200

    
201
    # PycURL requires strings to be non-unicode
202
    assert isinstance(method, str)
203
    assert isinstance(url, str)
204
    assert isinstance(post_data, str)
205
    assert compat.all(isinstance(i, str) for i in headers)
206

    
207
    # Configure cURL object for request
208
    curl = self._curl
209
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
210
    curl.setopt(pycurl.URL, url)
211
    curl.setopt(pycurl.POSTFIELDS, post_data)
212
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
213
    curl.setopt(pycurl.HTTPHEADER, headers)
214

    
215
    if req.read_timeout is None:
216
      curl.setopt(pycurl.TIMEOUT, 0)
217
    else:
218
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
219

    
220
    # Pass cURL object to external config function
221
    if req.curl_config_fn:
222
      req.curl_config_fn(curl)
223

    
224
  def Done(self, errmsg):
225
    """Finishes a request.
226

227
    @type errmsg: string or None
228
    @param errmsg: Error message if request failed
229

230
    """
231
    req = self._req
232
    assert req, "No request"
233

    
234
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
235

    
236
    curl = self._curl
237

    
238
    req.success = not bool(errmsg)
239
    req.error = errmsg
240

    
241
    # Get HTTP response code
242
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243
    req.resp_body = self._resp_buffer.getvalue()
244

    
245
    # Reset client object
246
    self._req = None
247
    self._resp_buffer = None
248

    
249
    # Ensure no potentially large variables are referenced
250
    curl.setopt(pycurl.POSTFIELDS, "")
251
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252

    
253

    
254
class _PooledHttpClient:
255
  """Data structure for HTTP client pool.
256

257
  """
258
  def __init__(self, identity, client):
259
    """Initializes this class.
260

261
    @type identity: string
262
    @param identity: Client identifier for pool
263
    @type client: L{_HttpClient}
264
    @param client: HTTP client
265

266
    """
267
    self.identity = identity
268
    self.client = client
269
    self.lastused = 0
270

    
271
  def __repr__(self):
272
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273
              "id=%s" % self.identity,
274
              "lastuse=%s" % self.lastused,
275
              repr(self.client)]
276

    
277
    return "<%s at %#x>" % (" ".join(status), id(self))
278

    
279

    
280
class HttpClientPool:
281
  """A simple HTTP client pool.
282

283
  Supports one pooled connection per identity (see
284
  L{HttpClientRequest.identity}).
285

286
  """
287
  #: After how many generations to drop unused clients
288
  _MAX_GENERATIONS_DROP = 25
289

    
290
  def __init__(self, curl_config_fn):
291
    """Initializes this class.
292

293
    @type curl_config_fn: callable
294
    @param curl_config_fn: Function to configure cURL object after
295
                           initialization
296

297
    """
298
    self._curl_config_fn = curl_config_fn
299
    self._generation = 0
300
    self._pool = {}
301

    
302
    # Create custom logger for HTTP client pool. Change logging level to
303
    # C{logging.NOTSET} to get more details.
304
    self._logger = logging.getLogger(self.__class__.__name__)
305
    self._logger.setLevel(logging.INFO)
306

    
307
  @staticmethod
308
  def _GetHttpClientCreator():
309
    """Returns callable to create HTTP client.
310

311
    """
312
    return _HttpClient
313

    
314
  def _Get(self, identity):
315
    """Gets an HTTP client from the pool.
316

317
    @type identity: string
318
    @param identity: Client identifier
319

320
    """
321
    try:
322
      pclient = self._pool.pop(identity)
323
    except KeyError:
324
      # Need to create new client
325
      client = self._GetHttpClientCreator()(self._curl_config_fn)
326
      pclient = _PooledHttpClient(identity, client)
327
      self._logger.debug("Created new client %s", pclient)
328
    else:
329
      self._logger.debug("Reusing client %s", pclient)
330

    
331
    assert pclient.identity == identity
332

    
333
    return pclient
334

    
335
  def _StartRequest(self, req):
336
    """Starts a request.
337

338
    @type req: L{HttpClientRequest}
339
    @param req: HTTP request
340

341
    """
342
    pclient = self._Get(req.identity)
343

    
344
    assert req.identity not in self._pool
345

    
346
    pclient.client.StartRequest(req)
347
    pclient.lastused = self._generation
348

    
349
    return pclient
350

    
351
  def _Return(self, pclients):
352
    """Returns HTTP clients to the pool.
353

354
    """
355
    assert not frozenset(pclients) & frozenset(self._pool.values())
356

    
357
    for pc in pclients:
358
      self._logger.debug("Returning client %s to pool", pc)
359
      assert pc.identity not in self._pool
360
      self._pool[pc.identity] = pc
361

    
362
    # Check for unused clients
363
    for pc in self._pool.values():
364
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
365
        self._logger.debug("Removing client %s which hasn't been used"
366
                           " for %s generations",
367
                           pc, self._MAX_GENERATIONS_DROP)
368
        self._pool.pop(pc.identity, None)
369

    
370
    assert compat.all(pc.lastused >= (self._generation -
371
                                      self._MAX_GENERATIONS_DROP)
372
                      for pc in self._pool.values())
373

    
374
  @staticmethod
375
  def _CreateCurlMultiHandle():
376
    """Creates new cURL multi handle.
377

378
    """
379
    return pycurl.CurlMulti()
380

    
381
  def ProcessRequests(self, requests):
382
    """Processes any number of HTTP client requests using pooled objects.
383

384
    @type requests: list of L{HttpClientRequest}
385
    @param requests: List of all requests
386

387
    """
388
    multi = self._CreateCurlMultiHandle()
389

    
390
    # For client cleanup
391
    self._generation += 1
392

    
393
    assert compat.all((req.error is None and
394
                       req.success is None and
395
                       req.resp_status_code is None and
396
                       req.resp_body is None)
397
                      for req in requests)
398

    
399
    curl_to_pclient = {}
400
    for req in requests:
401
      pclient = self._StartRequest(req)
402
      curl = pclient.client.GetCurlHandle()
403
      curl_to_pclient[curl] = pclient
404
      multi.add_handle(curl)
405
      assert pclient.client.GetCurrentRequest() == req
406
      assert pclient.lastused >= 0
407

    
408
    assert len(curl_to_pclient) == len(requests)
409

    
410
    done_count = 0
411
    while True:
412
      (ret, _) = multi.perform()
413
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
414

    
415
      if ret == pycurl.E_CALL_MULTI_PERFORM:
416
        # cURL wants to be called again
417
        continue
418

    
419
      while True:
420
        (remaining_messages, successful, failed) = multi.info_read()
421

    
422
        for curl in successful:
423
          multi.remove_handle(curl)
424
          done_count += 1
425
          pclient = curl_to_pclient[curl]
426
          req = pclient.client.GetCurrentRequest()
427
          pclient.client.Done(None)
428
          assert req.success
429
          assert not pclient.client.GetCurrentRequest()
430

    
431
        for curl, errnum, errmsg in failed:
432
          multi.remove_handle(curl)
433
          done_count += 1
434
          pclient = curl_to_pclient[curl]
435
          req = pclient.client.GetCurrentRequest()
436
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
437
          assert req.error
438
          assert not pclient.client.GetCurrentRequest()
439

    
440
        if remaining_messages == 0:
441
          break
442

    
443
      assert done_count <= len(requests)
444

    
445
      if done_count == len(requests):
446
        break
447

    
448
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
449
      # timeouts, which are only evaluated in multi.perform, aren't
450
      # unnecessarily delayed.
451
      multi.select(1.0)
452

    
453
    assert compat.all(pclient.client.GetCurrentRequest() is None
454
                      for pclient in curl_to_pclient.values())
455

    
456
    # Return clients to pool
457
    self._Return(curl_to_pclient.values())
458

    
459
    assert done_count == len(requests)
460
    assert compat.all(req.error is not None or
461
                      (req.success and
462
                       req.resp_status_code is not None and
463
                       req.resp_body is not None)
464
                      for req in requests)