Statistics
| Branch: | Tag: | Revision:

root / lib / http / client.py @ 7b70d7a8

History | View | Annotate | Download (12.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""HTTP client module.
22

23
"""
24

    
25
import logging
26
import pycurl
27
from cStringIO import StringIO
28

    
29
from ganeti import http
30
from ganeti import compat
31
from ganeti import netutils
32

    
33

    
34
class HttpClientRequest(object):
35
  def __init__(self, host, port, method, path, headers=None, post_data=None,
36
               read_timeout=None, curl_config_fn=None):
37
    """Describes an HTTP request.
38

39
    @type host: string
40
    @param host: Hostname
41
    @type port: int
42
    @param port: Port
43
    @type method: string
44
    @param method: Method name
45
    @type path: string
46
    @param path: Request path
47
    @type headers: list or None
48
    @param headers: Additional headers to send, list of strings
49
    @type post_data: string or None
50
    @param post_data: Additional data to send
51
    @type read_timeout: int
52
    @param read_timeout: if passed, it will be used as the read
53
        timeout while reading the response from the server
54
    @type curl_config_fn: callable
55
    @param curl_config_fn: Function to configure cURL object before request
56
                           (Note: if the function configures the connection in
57
                           a way where it wouldn't be efficient to reuse them,
58
                           a "identity" property should be defined, see
59
                           L{HttpClientRequest.identity})
60

61
    """
62
    assert path.startswith("/"), "Path must start with slash (/)"
63
    assert curl_config_fn is None or callable(curl_config_fn)
64

    
65
    # Request attributes
66
    self.host = host
67
    self.port = port
68
    self.method = method
69
    self.path = path
70
    self.read_timeout = read_timeout
71
    self.curl_config_fn = curl_config_fn
72

    
73
    if post_data is None:
74
      self.post_data = ""
75
    else:
76
      self.post_data = post_data
77

    
78
    if headers is None:
79
      self.headers = []
80
    elif isinstance(headers, dict):
81
      # Support for old interface
82
      self.headers = ["%s: %s" % (name, value)
83
                      for name, value in headers.items()]
84
    else:
85
      self.headers = headers
86

    
87
    # Response status
88
    self.success = None
89
    self.error = None
90

    
91
    # Response attributes
92
    self.resp_status_code = None
93
    self.resp_body = None
94

    
95
  def __repr__(self):
96
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
97
              "%s:%s" % (self.host, self.port),
98
              self.method,
99
              self.path]
100

    
101
    return "<%s at %#x>" % (" ".join(status), id(self))
102

    
103
  @property
104
  def url(self):
105
    """Returns the full URL for this requests.
106

107
    """
108
    if netutils.IPAddress.IsValid(self.host):
109
      address = netutils.FormatAddress((self.host, self.port))
110
    else:
111
      address = "%s:%s" % (self.host, self.port)
112
    # TODO: Support for non-SSL requests
113
    return "https://%s%s" % (address, self.path)
114

    
115
  @property
116
  def identity(self):
117
    """Returns identifier for retrieving a pooled connection for this request.
118

119
    This allows cURL client objects to be re-used and to cache information
120
    (e.g. SSL session IDs or connections).
121

122
    """
123
    parts = [self.host, self.port]
124

    
125
    if self.curl_config_fn:
126
      try:
127
        parts.append(self.curl_config_fn.identity)
128
      except AttributeError:
129
        pass
130

    
131
    return "/".join(str(i) for i in parts)
132

    
133

    
134
class _HttpClient(object):
135
  def __init__(self, curl_config_fn):
136
    """Initializes this class.
137

138
    @type curl_config_fn: callable
139
    @param curl_config_fn: Function to configure cURL object after
140
                           initialization
141

142
    """
143
    self._req = None
144

    
145
    curl = self._CreateCurlHandle()
146
    curl.setopt(pycurl.VERBOSE, False)
147
    curl.setopt(pycurl.NOSIGNAL, True)
148
    curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
149
    curl.setopt(pycurl.PROXY, "")
150

    
151
    # Disable SSL session ID caching (pycurl >= 7.16.0)
152
    if hasattr(pycurl, "SSL_SESSIONID_CACHE"):
153
      curl.setopt(pycurl.SSL_SESSIONID_CACHE, False)
154

    
155
    # Pass cURL object to external config function
156
    if curl_config_fn:
157
      curl_config_fn(curl)
158

    
159
    self._curl = curl
160

    
161
  @staticmethod
162
  def _CreateCurlHandle():
163
    """Returns a new cURL object.
164

165
    """
166
    return pycurl.Curl()
167

    
168
  def GetCurlHandle(self):
169
    """Returns the cURL object.
170

171
    """
172
    return self._curl
173

    
174
  def GetCurrentRequest(self):
175
    """Returns the current request.
176

177
    @rtype: L{HttpClientRequest} or None
178

179
    """
180
    return self._req
181

    
182
  def StartRequest(self, req):
183
    """Starts a request on this client.
184

185
    @type req: L{HttpClientRequest}
186
    @param req: HTTP request
187

188
    """
189
    assert not self._req, "Another request is already started"
190

    
191
    self._req = req
192
    self._resp_buffer = StringIO()
193

    
194
    url = req.url
195
    method = req.method
196
    post_data = req.post_data
197
    headers = req.headers
198

    
199
    # PycURL requires strings to be non-unicode
200
    assert isinstance(method, str)
201
    assert isinstance(url, str)
202
    assert isinstance(post_data, str)
203
    assert compat.all(isinstance(i, str) for i in headers)
204

    
205
    # Configure cURL object for request
206
    curl = self._curl
207
    curl.setopt(pycurl.CUSTOMREQUEST, str(method))
208
    curl.setopt(pycurl.URL, url)
209
    curl.setopt(pycurl.POSTFIELDS, post_data)
210
    curl.setopt(pycurl.WRITEFUNCTION, self._resp_buffer.write)
211
    curl.setopt(pycurl.HTTPHEADER, headers)
212

    
213
    if req.read_timeout is None:
214
      curl.setopt(pycurl.TIMEOUT, 0)
215
    else:
216
      curl.setopt(pycurl.TIMEOUT, int(req.read_timeout))
217

    
218
    # Pass cURL object to external config function
219
    if req.curl_config_fn:
220
      req.curl_config_fn(curl)
221

    
222
  def Done(self, errmsg):
223
    """Finishes a request.
224

225
    @type errmsg: string or None
226
    @param errmsg: Error message if request failed
227

228
    """
229
    req = self._req
230
    assert req, "No request"
231

    
232
    logging.debug("Request %s finished, errmsg=%s", req, errmsg)
233

    
234
    curl = self._curl
235

    
236
    req.success = not bool(errmsg)
237
    req.error = errmsg
238

    
239
    # Get HTTP response code
240
    req.resp_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
241
    req.resp_body = self._resp_buffer.getvalue()
242

    
243
    # Reset client object
244
    self._req = None
245
    self._resp_buffer = None
246

    
247
    # Ensure no potentially large variables are referenced
248
    curl.setopt(pycurl.POSTFIELDS, "")
249
    curl.setopt(pycurl.WRITEFUNCTION, lambda _: None)
250

    
251

    
252
class _PooledHttpClient:
253
  """Data structure for HTTP client pool.
254

255
  """
256
  def __init__(self, identity, client):
257
    """Initializes this class.
258

259
    @type identity: string
260
    @param identity: Client identifier for pool
261
    @type client: L{_HttpClient}
262
    @param client: HTTP client
263

264
    """
265
    self.identity = identity
266
    self.client = client
267
    self.lastused = 0
268

    
269
  def __repr__(self):
270
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
271
              "id=%s" % self.identity,
272
              "lastuse=%s" % self.lastused,
273
              repr(self.client)]
274

    
275
    return "<%s at %#x>" % (" ".join(status), id(self))
276

    
277

    
278
class HttpClientPool:
279
  """A simple HTTP client pool.
280

281
  Supports one pooled connection per identity (see
282
  L{HttpClientRequest.identity}).
283

284
  """
285
  #: After how many generations to drop unused clients
286
  _MAX_GENERATIONS_DROP = 25
287

    
288
  def __init__(self, curl_config_fn):
289
    """Initializes this class.
290

291
    @type curl_config_fn: callable
292
    @param curl_config_fn: Function to configure cURL object after
293
                           initialization
294

295
    """
296
    self._curl_config_fn = curl_config_fn
297
    self._generation = 0
298
    self._pool = {}
299

    
300
  @staticmethod
301
  def _GetHttpClientCreator():
302
    """Returns callable to create HTTP client.
303

304
    """
305
    return _HttpClient
306

    
307
  def _Get(self, identity):
308
    """Gets an HTTP client from the pool.
309

310
    @type identity: string
311
    @param identity: Client identifier
312

313
    """
314
    try:
315
      pclient  = self._pool.pop(identity)
316
    except KeyError:
317
      # Need to create new client
318
      client = self._GetHttpClientCreator()(self._curl_config_fn)
319
      pclient = _PooledHttpClient(identity, client)
320
      logging.debug("Created new client %s", pclient)
321
    else:
322
      logging.debug("Reusing client %s", pclient)
323

    
324
    assert pclient.identity == identity
325

    
326
    return pclient
327

    
328
  def _StartRequest(self, req):
329
    """Starts a request.
330

331
    @type req: L{HttpClientRequest}
332
    @param req: HTTP request
333

334
    """
335
    logging.debug("Starting request %r", req)
336
    pclient = self._Get(req.identity)
337

    
338
    assert req.identity not in self._pool
339

    
340
    pclient.client.StartRequest(req)
341
    pclient.lastused = self._generation
342

    
343
    return pclient
344

    
345
  def _Return(self, pclients):
346
    """Returns HTTP clients to the pool.
347

348
    """
349
    for pc in pclients:
350
      logging.debug("Returning client %s to pool", pc)
351
      assert pc.identity not in self._pool
352
      assert pc not in self._pool.values()
353
      self._pool[pc.identity] = pc
354

    
355
    # Check for unused clients
356
    for pc in self._pool.values():
357
      if (pc.lastused + self._MAX_GENERATIONS_DROP) < self._generation:
358
        logging.debug("Removing client %s which hasn't been used"
359
                      " for %s generations",
360
                      pc, self._MAX_GENERATIONS_DROP)
361
        self._pool.pop(pc.identity, None)
362

    
363
    assert compat.all(pc.lastused >= (self._generation -
364
                                      self._MAX_GENERATIONS_DROP)
365
                      for pc in self._pool.values())
366

    
367
  @staticmethod
368
  def _CreateCurlMultiHandle():
369
    """Creates new cURL multi handle.
370

371
    """
372
    return pycurl.CurlMulti()
373

    
374
  def ProcessRequests(self, requests):
375
    """Processes any number of HTTP client requests using pooled objects.
376

377
    @type requests: list of L{HttpClientRequest}
378
    @param requests: List of all requests
379

380
    """
381
    multi = self._CreateCurlMultiHandle()
382

    
383
    # For client cleanup
384
    self._generation += 1
385

    
386
    assert compat.all((req.error is None and
387
                       req.success is None and
388
                       req.resp_status_code is None and
389
                       req.resp_body is None)
390
                      for req in requests)
391

    
392
    curl_to_pclient = {}
393
    for req in requests:
394
      pclient = self._StartRequest(req)
395
      curl = pclient.client.GetCurlHandle()
396
      curl_to_pclient[curl] = pclient
397
      multi.add_handle(curl)
398
      assert pclient.client.GetCurrentRequest() == req
399
      assert pclient.lastused >= 0
400

    
401
    assert len(curl_to_pclient) == len(requests)
402

    
403
    done_count = 0
404
    while True:
405
      (ret, _) = multi.perform()
406
      assert ret in (pycurl.E_MULTI_OK, pycurl.E_CALL_MULTI_PERFORM)
407

    
408
      if ret == pycurl.E_CALL_MULTI_PERFORM:
409
        # cURL wants to be called again
410
        continue
411

    
412
      while True:
413
        (remaining_messages, successful, failed) = multi.info_read()
414

    
415
        for curl in successful:
416
          multi.remove_handle(curl)
417
          done_count += 1
418
          pclient = curl_to_pclient[curl]
419
          req = pclient.client.GetCurrentRequest()
420
          pclient.client.Done(None)
421
          assert req.success
422
          assert not pclient.client.GetCurrentRequest()
423

    
424
        for curl, errnum, errmsg in failed:
425
          multi.remove_handle(curl)
426
          done_count += 1
427
          pclient = curl_to_pclient[curl]
428
          req = pclient.client.GetCurrentRequest()
429
          pclient.client.Done("Error %s: %s" % (errnum, errmsg))
430
          assert req.error
431
          assert not pclient.client.GetCurrentRequest()
432

    
433
        if remaining_messages == 0:
434
          break
435

    
436
      assert done_count <= len(requests)
437

    
438
      if done_count == len(requests):
439
        break
440

    
441
      # Wait for I/O. The I/O timeout shouldn't be too long so that HTTP
442
      # timeouts, which are only evaluated in multi.perform, aren't
443
      # unnecessarily delayed.
444
      multi.select(1.0)
445

    
446
    assert compat.all(pclient.client.GetCurrentRequest() is None
447
                      for pclient in curl_to_pclient.values())
448

    
449
    # Return clients to pool
450
    self._Return(curl_to_pclient.values())
451

    
452
    assert done_count == len(requests)
453
    assert compat.all(req.error is not None or
454
                      (req.success and
455
                       req.resp_status_code is not None and
456
                       req.resp_body is not None)
457
                      for req in requests)