Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ ecd61b4e

History | View | Annotate | Download (13 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31
from ganeti import netutils
32

    
33

    
34
class HttpClientRequest(object):
35
  def __init__(self, host, port, method, path, headers=None, post_data=None,
36
               read_timeout=None, curl_config_fn=None):
37
    """Describes an HTTP request.
38

39
    @type host: string
40
    @param host: Hostname
41
    @type port: int
42
    @param port: Port
43
    @type method: string
44
    @param method: Method name
45
    @type path: string
46
    @param path: Request path
47
    @type headers: list or None
48
    @param headers: Additional headers to send, list of strings
49
    @type post_data: string or None
50
    @param post_data: Additional data to send
51
    @type read_timeout: int
52
    @param read_timeout: if passed, it will be used as the read
53
        timeout while reading the response from the server
54
    @type curl_config_fn: callable
55
    @param curl_config_fn: Function to configure cURL object before request
56
                           (Note: if the function configures the connection in
57
                           a way where it wouldn't be efficient to reuse them,
58
                           a "identity" property should be defined, see
59
                           L{HttpClientRequest.identity})
60

61
    """
62
    assert path.startswith("/"), "Path must start with slash (/)"
63
    assert curl_config_fn is None or callable(curl_config_fn)
64

    
65
    # Request attributes
66
    self.host = host
67
    self.port = port
68
    self.method = method
69
    self.path = path
70
    self.read_timeout = read_timeout
71
    self.curl_config_fn = curl_config_fn
72

    
73
    if post_data is None:
74
      self.post_data = ""
75
    else:
76
      self.post_data = post_data
77

    
78
    if headers is None:
79
      self.headers = []
80
    elif isinstance(headers, dict):
81
      # Support for old interface
82
      self.headers = ["%s: %s" % (name, value)
83
                      for name, value in headers.items()]
84
    else:
85
      self.headers = headers
86

    
87
    # Response status
88
    self.success = None
89
    self.error = None
90

    
91
    # Response attributes
92
    self.resp_status_code = None
93
    self.resp_body = None
94

    
95
  def __repr__(self):
96
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97
              "%s:%s" % (self.host, self.port),
98
              self.method,
99
              self.path]
100

    
101
    return "<%s at %#x>" % (" ".join(status), id(self))
102

    
103
  @property
104
  def url(self):
105
    """Returns the full URL for this requests.
106

107
    """
108
    if netutils.IPAddress.IsValid(self.host):
109
      address = netutils.FormatAddress((self.host, self.port))
110
    else:
111
      address = "%s:%s" % (self.host, self.port)
112
    # TODO: Support for non-SSL requests
113
    return "https://%s%s" % (address, self.path)
114

    
115
  @property
116
  def identity(self):
117
    """Returns identifier for retrieving a pooled connection for this request.
118

119
    This allows cURL client objects to be re-used and to cache information
120
    (e.g. SSL session IDs or connections).
121

122
    """
123
    parts = [self.host, self.port]
124

    
125
    if self.curl_config_fn:
126
      try:
127
        parts.append(self.curl_config_fn.identity)
128
      except AttributeError:
129
        pass
130

    
131
    return "/".join(str(i) for i in parts)
132

    
133

    
134
class _HttpClient(object):
135
  def __init__(self, curl_config_fn):
136
    """Initializes this class.
137

138
    @type curl_config_fn: callable
139
    @param curl_config_fn: Function to configure cURL object after
140
                           initialization
141

142
    """
143
    self._req = None
144

    
145
    curl = self._CreateCurlHandle()
146
    curl.setopt(pycurl.VERBOSE, False)
147
    curl.setopt(pycurl.NOSIGNAL, True)
148
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149
    curl.setopt(pycurl.PROXY, "")
150

    
151
    # Disable SSL session ID caching (pycurl >= 7.16.0)
152
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154

    
155
    # Pass cURL object to external config function
156
    if curl_config_fn:
157
      curl_config_fn(curl)
158

    
159
    self._curl = curl
160

    
161
  @staticmethod
162
  def _CreateCurlHandle():
163
    """Returns a new cURL object.
164

165
    """
166
    return pycurl.Curl()
167

    
168
  def GetCurlHandle(self):
169
    """Returns the cURL object.
170

171
    """
172
    return self._curl
173

    
174
  def GetCurrentRequest(self):
175
    """Returns the current request.
176

177
    @rtype: L{HttpClientRequest} or None
178

179
    """
180
    return self._req
181

    
182
  def StartRequest(self, req):
183
    """Starts a request on this client.
184

185
    @type req: L{HttpClientRequest}
186
    @param req: HTTP request
187

188
    """
189
    assert not self._req, "Another request is already started"
190

    
191
    logging.debug("Starting request %r", req)
192

    
193
    self._req = req
194
    self._resp_buffer = StringIO()
195

    
196
    url = req.url
197
    method = req.method
198
    post_data = req.post_data
199
    headers = req.headers
200

    
201
    # PycURL requires strings to be non-unicode
202
    assert isinstance(method, str)
203
    assert isinstance(url, str)
204
    assert isinstance(post_data, str)
205
    assert compat.all(isinstance(i, str) for i in headers)
206

    
207
    # Configure cURL object for request
208
    curl = self._curl
209
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
210
    curl.setopt(pycurl.URL, url)
211
    curl.setopt(pycurl.POSTFIELDS, post_data)
212
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
213
    curl.setopt(pycurl.HTTPHEADER, headers)
214

    
215
    if req.read_timeout is None:
216
      curl.setopt(pycurl.TIMEOUT, 0)
217
    else:
218
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
219

    
220
    # Pass cURL object to external config function
221
    if req.curl_config_fn:
222
      req.curl_config_fn(curl)
223

    
224
  def Done(self, errmsg):
225
    """Finishes a request.
226

227
    @type errmsg: string or None
228
    @param errmsg: Error message if request failed
229

230
    """
231
    req = self._req
232
    assert req, "No request"
233

    
234
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
235

    
236
    curl = self._curl
237

    
238
    req.success = not bool(errmsg)
239
    req.error = errmsg
240

    
241
    # Get HTTP response code
242
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
243
    req.resp_body = self._resp_buffer.getvalue()
244

    
245
    # Reset client object
246
    self._req = None
247
    self._resp_buffer = None
248

    
249
    # Ensure no potentially large variables are referenced
250
    curl.setopt(pycurl.POSTFIELDS, "")
251
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
252

    
253

    
254
class _PooledHttpClient:
255
  """Data structure for HTTP client pool.
256

257
  """
258
  def __init__(self, identity, client):
259
    """Initializes this class.
260

261
    @type identity: string
262
    @param identity: Client identifier for pool
263
    @type client: L{_HttpClient}
264
    @param client: HTTP client
265

266
    """
267
    self.identity = identity
268
    self.client = client
269
    self.lastused = 0
270

    
271
  def __repr__(self):
272
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
273
              "id=%s" % self.identity,
274
              "lastuse=%s" % self.lastused,
275
              repr(self.client)]
276

    
277
    return "<%s at %#x>" % (" ".join(status), id(self))
278

    
279

    
280
class HttpClientPool:
281
  """A simple HTTP client pool.
282

283
  Supports one pooled connection per identity (see
284
  L{HttpClientRequest.identity}).
285

286
  """
287
  #: After how many generations to drop unused clients
288
  _MAX_GENERATIONS_DROP = 25
289

    
290
  def __init__(self, curl_config_fn):
291
    """Initializes this class.
292

293
    @type curl_config_fn: callable
294
    @param curl_config_fn: Function to configure cURL object after
295
                           initialization
296

297
    """
298
    self._curl_config_fn = curl_config_fn
299
    self._generation = 0
300
    self._pool = {}
301

    
302
    # Create custom logger for HTTP client pool. Change logging level to
303
    # C{logging.NOTSET} to get more details.
304
    self._logger = logging.getLogger(self.__class__.__name__)
305
    self._logger.setLevel(logging.INFO)
306

    
307
  @staticmethod
308
  def _GetHttpClientCreator():
309
    """Returns callable to create HTTP client.
310

311
    """
312
    return _HttpClient
313

    
314
  def _Get(self, identity):
315
    """Gets an HTTP client from the pool.
316

317
    @type identity: string
318
    @param identity: Client identifier
319

320
    """
321
    try:
322
      pclient = self._pool.pop(identity)
323
    except KeyError:
324
      # Need to create new client
325
      client = self._GetHttpClientCreator()(self._curl_config_fn)
326
      pclient = _PooledHttpClient(identity, client)
327
      self._logger.debug("Created new client %s", pclient)
328
    else:
329
      self._logger.debug("Reusing client %s", pclient)
330

    
331
    assert pclient.identity == identity
332

    
333
    return pclient
334

    
335
  def _StartRequest(self, req):
336
    """Starts a request.
337

338
    @type req: L{HttpClientRequest}
339
    @param req: HTTP request
340

341
    """
342
    pclient = self._Get(req.identity)
343

    
344
    assert req.identity not in self._pool
345

    
346
    pclient.client.StartRequest(req)
347
    pclient.lastused = self._generation
348

    
349
    return pclient
350

    
351
  def _Return(self, pclients):
352
    """Returns HTTP clients to the pool.
353

354
    """
355
    assert not frozenset(pclients) & frozenset(self._pool.values())
356

    
357
    for pc in pclients:
358
      self._logger.debug("Returning client %s to pool", pc)
359
      assert pc.identity not in self._pool
360
      self._pool[pc.identity] = pc
361

    
362
    # Check for unused clients
363
    for pc in self._pool.values():
364
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
365
        self._logger.debug("Removing client %s which hasn't been used"
366
                           " for %s generations",
367
                           pc, self._MAX_GENERATIONS_DROP)
368
        self._pool.pop(pc.identity, None)
369

    
370
    assert compat.all(pc.lastused >= (self._generation -
371
                                      self._MAX_GENERATIONS_DROP)
372
                      for pc in self._pool.values())
373

    
374
  @staticmethod
375
  def _CreateCurlMultiHandle():
376
    """Creates new cURL multi handle.
377

378
    """
379
    return pycurl.CurlMulti()
380

    
381
  def ProcessRequests(self, requests):
382
    """Processes any number of HTTP client requests using pooled objects.
383

384
    @type requests: list of L{HttpClientRequest}
385
    @param requests: List of all requests
386

387
    """
388
    # For client cleanup
389
    self._generation += 1
390

    
391
    assert compat.all((req.error is None and
392
                       req.success is None and
393
                       req.resp_status_code is None and
394
                       req.resp_body is None)
395
                      for req in requests)
396

    
397
    curl_to_pclient = {}
398
    for req in requests:
399
      pclient = self._StartRequest(req)
400
      curl_to_pclient[pclient.client.GetCurlHandle()] = pclient
401
      assert pclient.client.GetCurrentRequest() == req
402
      assert pclient.lastused >= 0
403

    
404
    assert len(curl_to_pclient) == len(requests)
405

    
406
    # Process all requests and act based on the returned values
407
    for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
408
                                            curl_to_pclient.keys()):
409
      pclient = curl_to_pclient[curl]
410
      req = pclient.client.GetCurrentRequest()
411
      pclient.client.Done(msg)
412

    
413
      assert ((msg is None and req.success and req.error is None) ^
414
              (msg is not None and not req.success and req.error == msg))
415

    
416
    assert compat.all(pclient.client.GetCurrentRequest() is None
417
                      for pclient in curl_to_pclient.values())
418

    
419
    # Return clients to pool
420
    self._Return(curl_to_pclient.values())
421

    
422
    assert compat.all(req.error is not None or
423
                      (req.success and
424
                       req.resp_status_code is not None and
425
                       req.resp_body is not None)
426
                      for req in requests)
427

    
428

    
429
def _ProcessCurlRequests(multi, requests):
430
  """cURL request processor.
431

432
  This generator yields a tuple once for every completed request, successful or
433
  not. The first value in the tuple is the handle, the second an error message
434
  or C{None} for successful requests.
435

436
  @type multi: C{pycurl.CurlMulti}
437
  @param multi: cURL multi object
438
  @type requests: sequence
439
  @param requests: cURL request handles
440

441
  """
442
  for curl in requests:
443
    multi.add_handle(curl)
444

    
445
  while True:
446
    (ret, active) = multi.perform()
447
    assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
448

    
449
    if ret == pycurl.E_CALL_MULTI_PERFORM:
450
      # cURL wants to be called again
451
      continue
452

    
453
    while True:
454
      (remaining_messages, successful, failed) = multi.info_read()
455

    
456
      for curl in successful:
457
        multi.remove_handle(curl)
458
        yield (curl, None)
459

    
460
      for curl, errnum, errmsg in failed:
461
        multi.remove_handle(curl)
462
        yield (curl, "Error %s: %s" % (errnum, errmsg))
463

    
464
      if remaining_messages == 0:
465
        break
466

    
467
    if active == 0:
468
      # No active handles anymore
469
      break
470

    
471
    # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
472
    # timeouts, which are only evaluated in multi.perform, aren't
473
    # unnecessarily delayed.
474
    multi.select(1.0)